/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.ChunkedByteBuf;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServerHandler;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class KvStateServerHandlerTest
extends TestLogger {
    private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
    private static final int READ_TIMEOUT_MILLIS = 10000;

    @AfterClass
    public static void tearDown() throws Exception {
        if (TEST_THREAD_POOL != null) {
            TEST_THREAD_POOL.shutdown();
        }
    }

    @Test
    public void testSimpleQuery() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener((KvStateRegistryListener)registryListener);
        int expectedValue = 712828289;
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        state.update((Object)expectedValue);
        byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147666475L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (KvStateID)registryListener.kvStateId, (byte[])serializedKeyAndNamespace);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_RESULT, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestResult response = KvStateRequestSerializer.deserializeKvStateRequestResult((ByteBuf)buf);
        Assert.assertEquals((long)requestId, (long)response.getRequestId());
        int actualValue = (Integer)KvStateRequestSerializer.deserializeValue((byte[])response.getSerializedResult(), (TypeSerializer)IntSerializer.INSTANCE);
        Assert.assertEquals((long)expectedValue, (long)actualValue);
        Assert.assertEquals((String)stats.toString(), (long)1L, (long)stats.getNumRequests());
        long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS);
        while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)stats.toString(), (long)1L, (long)stats.getNumSuccessful());
    }

    @Test
    public void testQueryUnknownKvStateID() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        long requestId = 2147666475L;
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (KvStateID)new KvStateID(), (byte[])new byte[0]);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertEquals((long)requestId, (long)response.getRequestId());
        Assert.assertTrue((String)"Did not respond with expected failure cause", (boolean)(response.getCause() instanceof UnknownKvStateID));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testQueryUnknownKey() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener((KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)1238283, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147506629L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (KvStateID)registryListener.kvStateId, (byte[])serializedKeyAndNamespace);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertEquals((long)requestId, (long)response.getRequestId());
        Assert.assertTrue((String)"Did not respond with expected failure cause", (boolean)(response.getCause() instanceof UnknownKeyOrNamespace));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testFailureOnGetSerializedValue() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        InternalKvState kvState = (InternalKvState)Mockito.mock(InternalKvState.class);
        Mockito.when((Object)kvState.getSerializedValue((byte[])Matchers.any(byte[].class))).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception")});
        KvStateID kvStateId = registry.registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "vanilla", kvState);
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (KvStateID)kvStateId, (byte[])new byte[0]);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("Expected test Exception"));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testCloseChannelOnExceptionCaught() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{handler});
        channel.pipeline().fireExceptionCaught((Throwable)new RuntimeException("Expected test Exception"));
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.SERVER_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        Throwable response = KvStateRequestSerializer.deserializeServerFailure((ByteBuf)buf);
        Assert.assertTrue((boolean)response.getMessage().contains("Expected test Exception"));
        channel.closeFuture().await(10000L);
        Assert.assertFalse((boolean)channel.isActive());
    }

    @Test
    public void testQueryExecutorShutDown() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        ExecutorService closedExecutor = Executors.newSingleThreadExecutor();
        closedExecutor.shutdown();
        Assert.assertTrue((boolean)closedExecutor.isShutdown());
        KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener((KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (KvStateID)registryListener.kvStateId, (byte[])new byte[0]);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("RejectedExecutionException"));
        Assert.assertEquals((long)1L, (long)stats.getNumRequests());
        Assert.assertEquals((long)1L, (long)stats.getNumFailed());
    }

    @Test
    public void testUnexpectedMessage() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        ByteBuf unexpectedMessage = Unpooled.buffer((int)8);
        unexpectedMessage.writeInt(4);
        unexpectedMessage.writeInt(123238213);
        channel.writeInbound(new Object[]{unexpectedMessage});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.SERVER_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        Throwable response = KvStateRequestSerializer.deserializeServerFailure((ByteBuf)buf);
        Assert.assertEquals((long)0L, (long)stats.getNumRequests());
        Assert.assertEquals((long)0L, (long)stats.getNumFailed());
        unexpectedMessage = KvStateRequestSerializer.serializeKvStateRequestResult((ByteBufAllocator)channel.alloc(), (long)192L, (byte[])new byte[0]);
        channel.writeInbound(new Object[]{unexpectedMessage});
        buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.SERVER_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        response = KvStateRequestSerializer.deserializeServerFailure((ByteBuf)buf);
        Assert.assertTrue((String)("Unexpected failure cause " + response.getClass().getName()), (boolean)(response instanceof IllegalArgumentException));
        Assert.assertEquals((long)0L, (long)stats.getNumRequests());
        Assert.assertEquals((long)0L, (long)stats.getNumFailed());
    }

    @Test
    public void testIncomingBufferIsRecycled() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)282872L, (KvStateID)new KvStateID(), (byte[])new byte[0]);
        Assert.assertEquals((long)1L, (long)request.refCnt());
        channel.writeInbound(new Object[]{request});
        Assert.assertEquals((String)"Buffer not recycled", (long)0L, (long)request.refCnt());
        ByteBuf unexpected = channel.alloc().buffer(8);
        unexpected.writeInt(4);
        unexpected.writeInt(4);
        Assert.assertEquals((long)1L, (long)unexpected.refCnt());
        channel.writeInbound(new Object[]{unexpected});
        Assert.assertEquals((String)"Buffer not recycled", (long)0L, (long)unexpected.refCnt());
    }

    @Test
    public void testSerializerMismatch() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener((KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
        desc.setQueryable("vanilla");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        state.update((Object)712828289);
        byte[] wrongKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)"wrong-key-type", (TypeSerializer)StringSerializer.INSTANCE, (Object)"wrong-namespace-type", (TypeSerializer)StringSerializer.INSTANCE);
        byte[] wrongNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)"wrong-namespace-type", (TypeSerializer)StringSerializer.INSTANCE);
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)182828L, (KvStateID)registryListener.kvStateId, (byte[])wrongKeyAndNamespace);
        channel.writeInbound(new Object[]{request});
        ByteBuf buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertEquals((long)182828L, (long)response.getRequestId());
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("IOException"));
        request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)182829L, (KvStateID)registryListener.kvStateId, (byte[])wrongNamespace);
        channel.writeInbound(new Object[]{request});
        buf = (ByteBuf)this.readInboundBlocking(channel);
        buf.skipBytes(4);
        Assert.assertEquals((Object)KvStateRequestType.REQUEST_FAILURE, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
        response = KvStateRequestSerializer.deserializeKvStateRequestFailure((ByteBuf)buf);
        Assert.assertEquals((long)182829L, (long)response.getRequestId());
        Assert.assertTrue((boolean)response.getCause().getMessage().contains("IOException"));
        Assert.assertEquals((long)2L, (long)stats.getNumRequests());
        Assert.assertEquals((long)2L, (long)stats.getNumFailed());
    }

    @Test
    public void testChunkedResponse() throws Exception {
        KvStateRegistry registry = new KvStateRegistry();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, (KvStateRequestStats)stats);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{this.getFrameDecoder(), handler});
        int numKeyGroups = 1;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(registry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
        TestRegistryListener registryListener = new TestRegistryListener();
        registry.registerListener((KvStateRegistryListener)registryListener);
        ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        desc.setQueryable("vanilla");
        ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
        byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()];
        int current = 0;
        for (int i = 0; i < bytes.length; ++i) {
            int n = current;
            current = (byte)(current + 1);
            bytes[i] = n;
        }
        int key = 99812822;
        backend.setCurrentKey((Object)key);
        state.update((Object)bytes);
        byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        long requestId = 2147666475L;
        Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
        ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (KvStateID)registryListener.kvStateId, (byte[])serializedKeyAndNamespace);
        channel.writeInbound(new Object[]{request});
        Object msg = this.readInboundBlocking(channel);
        Assert.assertTrue((String)"Not ChunkedByteBuf", (boolean)(msg instanceof ChunkedByteBuf));
    }

    private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException {
        int sleepMillis = 50;
        Object msg = null;
        for (int sleptMillis = 0; sleptMillis < 10000 && (msg = channel.readOutbound()) == null; sleptMillis += 50) {
            Thread.sleep(50L);
        }
        if (msg == null) {
            throw new TimeoutException();
        }
        return msg;
    }

    private ChannelHandler getFrameDecoder() {
        return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
    }

    static class TestRegistryListener
    implements KvStateRegistryListener {
        volatile JobVertexID jobVertexID;
        volatile KeyGroupRange keyGroupIndex;
        volatile String registrationName;
        volatile KvStateID kvStateId;

        TestRegistryListener() {
        }

        public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
            this.jobVertexID = jobVertexId;
            this.keyGroupIndex = keyGroupRange;
            this.registrationName = registrationName;
            this.kvStateId = kvStateId;
        }

        public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
        }
    }
}

