/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.message.KvStateRequest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.NetUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class KvStateClientTest {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
    private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
    private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100L, TimeUnit.SECONDS);

    @AfterClass
    public static void tearDown() throws Exception {
        if (NIO_GROUP != null) {
            NIO_GROUP.shutdownGracefully();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleRequests() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateClient client = null;
        Channel serverChannel = null;
        try {
            int i;
            client = new KvStateClient(1, (KvStateRequestStats)stats);
            byte[] expected = new byte[1024];
            ThreadLocalRandom.current().nextBytes(expected);
            final LinkedBlockingQueue received = new LinkedBlockingQueue();
            final AtomicReference channel = new AtomicReference();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    channel.set(ctx.channel());
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    received.add((ByteBuf)msg);
                }
            }});
            KvStateServerAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            ArrayList<scala.concurrent.Future> futures = new ArrayList<scala.concurrent.Future>();
            int numQueries = 1024;
            for (int i2 = 0; i2 < numQueries; ++i2) {
                futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
            }
            RuntimeException testException = new RuntimeException("Expected test Exception");
            for (i = 0; i < numQueries; ++i) {
                ByteBuf response;
                ByteBuf buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                Assert.assertNotNull((String)"Receive timed out", (Object)buf);
                Channel ch = (Channel)channel.get();
                Assert.assertNotNull((String)"Channel not active", (Object)ch);
                Assert.assertEquals((Object)KvStateRequestType.REQUEST, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
                KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest((ByteBuf)buf);
                buf.release();
                if (i % 2 == 0) {
                    response = KvStateRequestSerializer.serializeKvStateRequestResult((ByteBufAllocator)serverChannel.alloc(), (long)request.getRequestId(), (byte[])expected);
                    ch.writeAndFlush((Object)response);
                    continue;
                }
                response = KvStateRequestSerializer.serializeKvStateRequestFailure((ByteBufAllocator)serverChannel.alloc(), (long)request.getRequestId(), (Throwable)testException);
                ch.writeAndFlush((Object)response);
            }
            for (i = 0; i < numQueries; ++i) {
                if (i % 2 == 0) {
                    byte[] serializedResult = (byte[])Await.result((Awaitable)((Awaitable)futures.get(i)), (Duration)deadline.timeLeft());
                    Assert.assertArrayEquals((byte[])expected, (byte[])serializedResult);
                    continue;
                }
                try {
                    Await.result((Awaitable)((Awaitable)futures.get(i)), (Duration)deadline.timeLeft());
                    Assert.fail((String)"Did not throw expected Exception");
                    continue;
                }
                catch (RuntimeException runtimeException) {
                    // empty catch block
                }
            }
            Assert.assertEquals((long)numQueries, (long)stats.getNumRequests());
            int expectedRequests = numQueries / 2;
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != (long)expectedRequests || stats.getNumFailed() != (long)expectedRequests)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)expectedRequests, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)expectedRequests, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestUnavailableHost() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateClient client = null;
        try {
            client = new KvStateClient(1, (KvStateRequestStats)stats);
            int availablePort = NetUtils.getAvailablePort();
            KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), availablePort);
            scala.concurrent.Future future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
            try {
                Await.result((Awaitable)future, (Duration)deadline.timeLeft());
                Assert.fail((String)"Did not throw expected ConnectException");
            }
            catch (ConnectException connectException) {
                // empty catch block
            }
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentQueries() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        ExecutorService executor = null;
        KvStateClient client = null;
        Channel serverChannel = null;
        final byte[] serializedResult = new byte[1024];
        ThreadLocalRandom.current().nextBytes(serializedResult);
        try {
            int numQueryTasks = 4;
            int numQueriesPerTask = 1024;
            executor = Executors.newFixedThreadPool(numQueryTasks);
            client = new KvStateClient(1, (KvStateRequestStats)stats);
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf)msg;
                    Assert.assertEquals((Object)KvStateRequestType.REQUEST, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
                    KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest((ByteBuf)buf);
                    buf.release();
                    ByteBuf response = KvStateRequestSerializer.serializeKvStateRequestResult((ByteBufAllocator)ctx.alloc(), (long)request.getRequestId(), (byte[])serializedResult);
                    ctx.channel().writeAndFlush((Object)response);
                }
            }});
            final KvStateServerAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            final KvStateClient finalClient = client;
            Callable<List<scala.concurrent.Future<byte[]>>> queryTask = new Callable<List<scala.concurrent.Future<byte[]>>>(){

                @Override
                public List<scala.concurrent.Future<byte[]>> call() throws Exception {
                    ArrayList<scala.concurrent.Future<byte[]>> results = new ArrayList<scala.concurrent.Future<byte[]>>(1024);
                    for (int i = 0; i < 1024; ++i) {
                        results.add((scala.concurrent.Future<byte[]>)finalClient.getKvState(serverAddress, new KvStateID(), new byte[0]));
                    }
                    return results;
                }
            };
            ArrayList<Future<List<scala.concurrent.Future<byte[]>>>> futures = new ArrayList<Future<List<scala.concurrent.Future<byte[]>>>>();
            for (int i = 0; i < numQueryTasks; ++i) {
                futures.add(executor.submit(queryTask));
            }
            for (Future future : futures) {
                List results = (List)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                for (scala.concurrent.Future result : results) {
                    byte[] actual = (byte[])Await.result((Awaitable)result, (Duration)deadline.timeLeft());
                    Assert.assertArrayEquals((byte[])serializedResult, (byte[])actual);
                }
            }
            int totalQueries = numQueryTasks * 1024;
            while (deadline.hasTimeLeft() && stats.getNumSuccessful() != (long)totalQueries) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)totalQueries, (long)stats.getNumRequests());
            Assert.assertEquals((long)totalQueries, (long)stats.getNumSuccessful());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            if (client != null) {
                client.shutDown();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailureClosesChannel() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateClient client = null;
        Channel serverChannel = null;
        try {
            client = new KvStateClient(1, (KvStateRequestStats)stats);
            final LinkedBlockingQueue received = new LinkedBlockingQueue();
            final AtomicReference channel = new AtomicReference();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    channel.set(ctx.channel());
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    received.add((ByteBuf)msg);
                }
            }});
            KvStateServerAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            ArrayList<scala.concurrent.Future> futures = new ArrayList<scala.concurrent.Future>();
            futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
            futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
            ByteBuf buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Assert.assertNotNull((String)"Receive timed out", (Object)buf);
            buf.release();
            buf = (ByteBuf)received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            Assert.assertNotNull((String)"Receive timed out", (Object)buf);
            buf.release();
            Assert.assertEquals((long)1L, (long)stats.getNumConnections());
            Channel ch = (Channel)channel.get();
            Assert.assertNotNull((String)"Channel not active", (Object)ch);
            ch.writeAndFlush((Object)KvStateRequestSerializer.serializeServerFailure((ByteBufAllocator)serverChannel.alloc(), (Throwable)new RuntimeException("Expected test server failure")));
            try {
                Await.result((Awaitable)((Awaitable)futures.remove(0)), (Duration)deadline.timeLeft());
                Assert.fail((String)"Did not throw expected server failure");
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            try {
                Await.result((Awaitable)((Awaitable)futures.remove(0)), (Duration)deadline.timeLeft());
                Assert.fail((String)"Did not throw expected server failure");
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            Assert.assertEquals((long)0L, (long)stats.getNumConnections());
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)2L, (long)stats.getNumRequests());
            Assert.assertEquals((long)0L, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)2L, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerClosesChannel() throws Exception {
        Deadline deadline = TEST_TIMEOUT.fromNow();
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        KvStateClient client = null;
        Channel serverChannel = null;
        try {
            client = new KvStateClient(1, (KvStateRequestStats)stats);
            final AtomicBoolean received = new AtomicBoolean();
            final AtomicReference channel = new AtomicReference();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    channel.set(ctx.channel());
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    received.set(true);
                }
            }});
            KvStateServerAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            scala.concurrent.Future future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
            while (!received.get() && deadline.hasTimeLeft()) {
                Thread.sleep(50L);
            }
            Assert.assertTrue((String)"Receive timed out", (boolean)received.get());
            Assert.assertEquals((long)1L, (long)stats.getNumConnections());
            ((Channel)channel.get()).close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            try {
                Await.result((Awaitable)future, (Duration)deadline.timeLeft());
                Assert.fail((String)"Did not throw expected server failure");
            }
            catch (ClosedChannelException closedChannelException) {
                // empty catch block
            }
            Assert.assertEquals((long)0L, (long)stats.getNumConnections());
            while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) {
                Thread.sleep(100L);
            }
            Assert.assertEquals((long)1L, (long)stats.getNumRequests());
            Assert.assertEquals((long)0L, (long)stats.getNumSuccessful());
            Assert.assertEquals((long)1L, (long)stats.getNumFailed());
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            Assert.assertEquals((String)"Channel leak", (long)0L, (long)stats.getNumConnections());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testClientServerIntegration() throws Exception {
        int numServers = 2;
        int numServerEventLoopThreads = 2;
        int numServerQueryThreads = 2;
        int numClientEventLoopThreads = 4;
        int numClientsTasks = 8;
        int batchSize = 16;
        boolean numKeyGroups = true;
        MemoryStateBackend abstractBackend = new MemoryStateBackend();
        KvStateRegistry dummyRegistry = new KvStateRegistry();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(dummyRegistry);
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
        final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
        KvStateClient client = null;
        ExecutorService clientTaskExecutor = null;
        final KvStateServer[] server = new KvStateServer[2];
        try {
            long numRequests;
            client = new KvStateClient(4, (KvStateRequestStats)clientStats);
            clientTaskExecutor = Executors.newFixedThreadPool(8);
            ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("any");
            KvStateRegistry[] registry = new KvStateRegistry[2];
            AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[2];
            final KvStateID[] ids = new KvStateID[2];
            for (int i = 0; i < 2; ++i) {
                registry[i] = new KvStateRegistry();
                serverStats[i] = new AtomicKvStateRequestStats();
                server[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 2, 2, registry[i], (KvStateRequestStats)serverStats[i]);
                server[i].start();
                backend.setCurrentKey((Object)(1010 + i));
                ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
                state.update((Object)(201 + i));
                InternalKvState kvState = (InternalKvState)state;
                ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
            }
            final KvStateClient finalClient = client;
            Callable<Void> queryTask = new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    block0: while (true) {
                        int targetServer;
                        int j;
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                        ArrayList<Integer> random = new ArrayList<Integer>();
                        for (int j2 = 0; j2 < 16; ++j2) {
                            random.add(j2);
                        }
                        Collections.shuffle(random);
                        ArrayList<scala.concurrent.Future> futures = new ArrayList<scala.concurrent.Future>(16);
                        for (j = 0; j < 16; ++j) {
                            targetServer = (Integer)random.get(j) % 2;
                            byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)(1010 + targetServer), (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
                            futures.add(finalClient.getKvState(server[targetServer].getAddress(), ids[targetServer], serializedKeyAndNamespace));
                        }
                        j = 0;
                        while (true) {
                            if (j >= 16) continue block0;
                            targetServer = (Integer)random.get(j) % 2;
                            scala.concurrent.Future future = (scala.concurrent.Future)futures.get(j);
                            byte[] buf = (byte[])Await.result((Awaitable)future, (Duration)timeout);
                            int value = (Integer)KvStateRequestSerializer.deserializeValue((byte[])buf, (TypeSerializer)IntSerializer.INSTANCE);
                            Assert.assertEquals((long)(201 + targetServer), (long)value);
                            ++j;
                        }
                        break;
                    }
                }
            };
            ArrayList<Future<Void>> taskFutures = new ArrayList<Future<Void>>();
            for (int i = 0; i < 8; ++i) {
                taskFutures.add(clientTaskExecutor.submit(queryTask));
            }
            while ((numRequests = clientStats.getNumRequests()) < 100000L) {
                Thread.sleep(100L);
                LOG.info("Number of requests {}/100_000", (Object)numRequests);
            }
            client.shutDown();
            for (Future future : taskFutures) {
                try {
                    future.get();
                    Assert.fail((String)"Did not throw expected Exception after shut down");
                }
                catch (ExecutionException t) {
                    if (t.getCause() instanceof ClosedChannelException || t.getCause() instanceof IllegalStateException) continue;
                    t.printStackTrace();
                    Assert.fail((String)("Failed with unexpected Exception type: " + t.getClass().getName()));
                }
            }
            Assert.assertEquals((String)"Connection leak (client)", (long)0L, (long)clientStats.getNumConnections());
            for (int i = 0; i < 2; ++i) {
                boolean bl = false;
                int numRetries = 0;
                while (!bl) {
                    try {
                        Assert.assertEquals((String)"Connection leak (server)", (long)0L, (long)serverStats[i].getNumConnections());
                        bl = true;
                    }
                    catch (Throwable t) {
                        if (numRetries < 10) {
                            LOG.info("Retrying connection leak check (server)");
                            Thread.sleep((numRetries + 1) * 50);
                            ++numRetries;
                            continue;
                        }
                        throw t;
                    }
                }
            }
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            for (int i = 0; i < 2; ++i) {
                if (server[i] == null) continue;
                server[i].shutDown();
            }
            if (clientTaskExecutor != null) {
                clientTaskExecutor.shutdown();
            }
        }
    }

    private Channel createServerChannel(final ChannelHandler ... handlers) throws UnknownHostException, InterruptedException {
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().localAddress(InetAddress.getLocalHost(), 0)).group((EventLoopGroup)NIO_GROUP).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(handlers);
            }
        });
        return bootstrap.bind().sync().channel();
    }

    private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
        InetSocketAddress localAddress = (InetSocketAddress)serverChannel.localAddress();
        return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
    }
}

