package org.apache.flink.runtime.query.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.class */
public class KvStateServerHandlerTest extends TestLogger {
    private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor();
    private static final int READ_TIMEOUT_MILLIS = 10000;

    /* loaded from: input_file:org/apache/flink/runtime/query/netty/KvStateServerHandlerTest$TestRegistryListener.class */
    static class TestRegistryListener implements KvStateRegistryListener {
        volatile JobVertexID jobVertexID;
        volatile KeyGroupRange keyGroupIndex;
        volatile String registrationName;
        volatile KvStateID kvStateId;

        public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID) {
            this.jobVertexID = jobVertexID;
            this.keyGroupIndex = keyGroupRange;
            this.registrationName = str;
            this.kvStateId = kvStateID;
        }

        public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) {
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (TEST_THREAD_POOL != null) {
            TEST_THREAD_POOL.shutdown();
        }
    }

    @Test
    public void testSimpleQuery() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()));
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(testRegistryListener);
        createKeyedStateBackend.setCurrentKey(99812822);
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor).update(712828289);
        byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 2147666475L, testRegistryListener.kvStateId, serializeKeyAndNamespace)});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(byteBuf));
        Assert.assertEquals(2147666475L, KvStateRequestSerializer.deserializeKvStateRequestResult(byteBuf).getRequestId());
        Assert.assertEquals(712828289, ((Integer) KvStateRequestSerializer.deserializeValue(r0.getSerializedResult(), IntSerializer.INSTANCE)).intValue());
        Assert.assertEquals(atomicKvStateRequestStats.toString(), 1L, atomicKvStateRequestStats.getNumRequests());
        long nanoTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30L, TimeUnit.SECONDS);
        while (atomicKvStateRequestStats.getNumSuccessful() != 1 && System.nanoTime() <= nanoTime) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(atomicKvStateRequestStats.toString(), 1L, atomicKvStateRequestStats.getNumSuccessful());
    }

    @Test
    public void testQueryUnknownKvStateID() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 2147666475L, new KvStateID(), new byte[0])});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        KvStateRequestFailure deserializeKvStateRequestFailure = KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf);
        Assert.assertEquals(2147666475L, deserializeKvStateRequestFailure.getRequestId());
        Assert.assertTrue("Did not respond with expected failure cause", deserializeKvStateRequestFailure.getCause() instanceof UnknownKvStateID);
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testQueryUnknownKey() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()));
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(1238283, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 2147506629L, testRegistryListener.kvStateId, serializeKeyAndNamespace)});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        KvStateRequestFailure deserializeKvStateRequestFailure = KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf);
        Assert.assertEquals(2147506629L, deserializeKvStateRequestFailure.getRequestId());
        Assert.assertTrue("Did not respond with expected failure cause", deserializeKvStateRequestFailure.getCause() instanceof UnknownKeyOrNamespace);
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testFailureOnGetSerializedValue() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        KvState kvState = (KvState) Mockito.mock(KvState.class);
        Mockito.when(kvState.getSerializedValue((byte[]) Matchers.any(byte[].class))).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception")});
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 282872L, kvStateRegistry.registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "vanilla", kvState), new byte[0])});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        Assert.assertTrue(KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf).getCause().getMessage().contains("Expected test Exception"));
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testCloseChannelOnExceptionCaught() throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{new KvStateServerHandler(new KvStateRegistry(), TEST_THREAD_POOL, new AtomicKvStateRequestStats())});
        embeddedChannel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        Assert.assertTrue(KvStateRequestSerializer.deserializeServerFailure(byteBuf).getMessage().contains("Expected test Exception"));
        embeddedChannel.closeFuture().await(10000L);
        Assert.assertFalse(embeddedChannel.isActive());
    }

    @Test
    public void testQueryExecutorShutDown() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.shutdown();
        Assert.assertTrue(newSingleThreadExecutor.isShutdown());
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, newSingleThreadExecutor, atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()));
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 282872L, testRegistryListener.kvStateId, new byte[0])});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        Assert.assertTrue(KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf).getCause().getMessage().contains("RejectedExecutionException"));
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(1L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testUnexpectedMessage() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        ByteBuf buffer = Unpooled.buffer(8);
        buffer.writeInt(4);
        buffer.writeInt(123238213);
        embeddedChannel.writeInbound(new Object[]{buffer});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        KvStateRequestSerializer.deserializeServerFailure(byteBuf);
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumFailed());
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequestResult(embeddedChannel.alloc(), 192L, new byte[0])});
        ByteBuf byteBuf2 = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf2.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf2));
        Throwable deserializeServerFailure = KvStateRequestSerializer.deserializeServerFailure(byteBuf2);
        Assert.assertTrue("Unexpected failure cause " + deserializeServerFailure.getClass().getName(), deserializeServerFailure instanceof IllegalArgumentException);
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(0L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testIncomingBufferIsRecycled() throws Exception {
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(new KvStateRegistry(), TEST_THREAD_POOL, new AtomicKvStateRequestStats())});
        ByteBuf serializeKvStateRequest = KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 282872L, new KvStateID(), new byte[0]);
        Assert.assertEquals(1L, serializeKvStateRequest.refCnt());
        embeddedChannel.writeInbound(new Object[]{serializeKvStateRequest});
        Assert.assertEquals("Buffer not recycled", 0L, serializeKvStateRequest.refCnt());
        ByteBuf buffer = embeddedChannel.alloc().buffer(8);
        buffer.writeInt(4);
        buffer.writeInt(4);
        Assert.assertEquals(1L, buffer.refCnt());
        embeddedChannel.writeInbound(new Object[]{buffer});
        Assert.assertEquals("Buffer not recycled", 0L, buffer.refCnt());
    }

    @Test
    public void testSerializerMismatch() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        AtomicKvStateRequestStats atomicKvStateRequestStats = new AtomicKvStateRequestStats();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, atomicKvStateRequestStats)});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()));
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        ValueState partitionedState = createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedStateBackend.setCurrentKey(99812822);
        partitionedState.update(712828289);
        byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace("wrong-key-type", StringSerializer.INSTANCE, "wrong-namespace-type", StringSerializer.INSTANCE);
        byte[] serializeKeyAndNamespace2 = KvStateRequestSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, "wrong-namespace-type", StringSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 182828L, testRegistryListener.kvStateId, serializeKeyAndNamespace)});
        ByteBuf byteBuf = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf));
        KvStateRequestFailure deserializeKvStateRequestFailure = KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf);
        Assert.assertEquals(182828L, deserializeKvStateRequestFailure.getRequestId());
        Assert.assertTrue(deserializeKvStateRequestFailure.getCause().getMessage().contains("IOException"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 182829L, testRegistryListener.kvStateId, serializeKeyAndNamespace2)});
        ByteBuf byteBuf2 = (ByteBuf) readInboundBlocking(embeddedChannel);
        byteBuf2.skipBytes(4);
        Assert.assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(byteBuf2));
        KvStateRequestFailure deserializeKvStateRequestFailure2 = KvStateRequestSerializer.deserializeKvStateRequestFailure(byteBuf2);
        Assert.assertEquals(182829L, deserializeKvStateRequestFailure2.getRequestId());
        Assert.assertTrue(deserializeKvStateRequestFailure2.getCause().getMessage().contains("IOException"));
        Assert.assertEquals(2L, atomicKvStateRequestStats.getNumRequests());
        Assert.assertEquals(2L, atomicKvStateRequestStats.getNumFailed());
    }

    @Test
    public void testChunkedResponse() throws Exception {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{getFrameDecoder(), new KvStateServerHandler(kvStateRegistry, TEST_THREAD_POOL, new AtomicKvStateRequestStats())});
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        dummyEnvironment.setKvStateRegistry(kvStateRegistry);
        AbstractKeyedStateBackend createKeyedStateBackend = memoryStateBackend.createKeyedStateBackend(dummyEnvironment, new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), kvStateRegistry.createTaskRegistry(dummyEnvironment.getJobID(), dummyEnvironment.getJobVertexId()));
        TestRegistryListener testRegistryListener = new TestRegistryListener();
        kvStateRegistry.registerListener(testRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("any", BytePrimitiveArraySerializer.INSTANCE);
        valueStateDescriptor.setQueryable("vanilla");
        ValueState partitionedState = createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        byte[] bArr = new byte[2 * embeddedChannel.config().getWriteBufferHighWaterMark()];
        byte b = 0;
        for (int i = 0; i < bArr.length; i++) {
            byte b2 = b;
            b = (byte) (b + 1);
            bArr[i] = b2;
        }
        createKeyedStateBackend.setCurrentKey(99812822);
        partitionedState.update(bArr);
        byte[] serializeKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(99812822, IntSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
        Assert.assertTrue(testRegistryListener.registrationName.equals("vanilla"));
        embeddedChannel.writeInbound(new Object[]{KvStateRequestSerializer.serializeKvStateRequest(embeddedChannel.alloc(), 2147666475L, testRegistryListener.kvStateId, serializeKeyAndNamespace)});
        Assert.assertTrue("Not ChunkedByteBuf", readInboundBlocking(embeddedChannel) instanceof ChunkedByteBuf);
    }

    private Object readInboundBlocking(EmbeddedChannel embeddedChannel) throws InterruptedException, TimeoutException {
        Object obj = null;
        for (int i = 0; i < READ_TIMEOUT_MILLIS; i += 50) {
            Object readOutbound = embeddedChannel.readOutbound();
            obj = readOutbound;
            if (readOutbound != null) {
                break;
            }
            Thread.sleep(50L);
        }
        if (obj == null) {
            throw new TimeoutException();
        }
        return obj;
    }

    private ChannelHandler getFrameDecoder() {
        return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4);
    }
}
