/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.net.InetAddress;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.KvStateServerHandlerTest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class KvStateServerTest {
    private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
    private static final int TIMEOUT_MILLIS = 10000;

    @AfterClass
    public static void tearDown() throws Exception {
        if (NIO_GROUP != null) {
            NIO_GROUP.shutdownGracefully();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleRequest() throws Exception {
        KvStateServer server = null;
        Bootstrap bootstrap = null;
        try {
            KvStateRegistry registry = new KvStateRegistry();
            AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
            server = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registry, (KvStateRequestStats)stats);
            server.start();
            KvStateServerAddress serverAddress = server.getAddress();
            int numKeyGroups = 1;
            MemoryStateBackend abstractBackend = new MemoryStateBackend();
            DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
            dummyEnv.setKvStateRegistry(registry);
            AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), registry.createTaskRegistry(new JobID(), new JobVertexID()));
            KvStateServerHandlerTest.TestRegistryListener registryListener = new KvStateServerHandlerTest.TestRegistryListener();
            registry.registerListener((KvStateRegistryListener)registryListener);
            ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("vanilla");
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            int expectedValue = 712828289;
            int key = 99812822;
            backend.setCurrentKey((Object)key);
            state.update((Object)expectedValue);
            byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
            final LinkedBlockingQueue responses = new LinkedBlockingQueue();
            bootstrap = this.createBootstrap(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), new ChannelInboundHandlerAdapter(){

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    responses.add((ByteBuf)msg);
                }
            }});
            Channel channel = bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()).sync().channel();
            long requestId = 2147666475L;
            Assert.assertTrue((boolean)registryListener.registrationName.equals("vanilla"));
            ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (KvStateID)registryListener.kvStateId, (byte[])serializedKeyAndNamespace);
            channel.writeAndFlush((Object)request);
            ByteBuf buf = (ByteBuf)responses.poll(10000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals((Object)KvStateRequestType.REQUEST_RESULT, (Object)KvStateRequestSerializer.deserializeHeader((ByteBuf)buf));
            KvStateRequestResult response = KvStateRequestSerializer.deserializeKvStateRequestResult((ByteBuf)buf);
            Assert.assertEquals((long)requestId, (long)response.getRequestId());
            int actualValue = (Integer)KvStateRequestSerializer.deserializeValue((byte[])response.getSerializedResult(), (TypeSerializer)IntSerializer.INSTANCE);
            Assert.assertEquals((long)expectedValue, (long)actualValue);
        }
        finally {
            EventLoopGroup group;
            if (server != null) {
                server.shutDown();
            }
            if (bootstrap != null && (group = bootstrap.group()) != null) {
                group.shutdownGracefully();
            }
        }
    }

    private Bootstrap createBootstrap(final ChannelHandler ... handlers) {
        return (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)NIO_GROUP)).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(handlers);
            }
        });
    }
}

