/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import java.net.ConnectException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.QueryableStateClient;
import org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.KvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.MathUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class QueryableStateClientTest {
    private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    private static final FiniteDuration timeout = new FiniteDuration(100L, TimeUnit.SECONDS);

    @AfterClass
    public static void tearDown() throws Exception {
        if (testActorSystem != null) {
            testActorSystem.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testForceLookupOnOutdatedLocation() throws Exception {
        KvStateLocationLookupService lookupService = (KvStateLocationLookupService)Mockito.mock(KvStateLocationLookupService.class);
        KvStateClient networkClient = (KvStateClient)Mockito.mock(KvStateClient.class);
        QueryableStateClient client = new QueryableStateClient(lookupService, networkClient, (ExecutionContext)testActorSystem.dispatcher());
        try {
            JobID jobId = new JobID();
            int numKeyGroups = 4;
            String query1 = "lucky";
            Future unknownKvStateLocation = Futures.failed((Throwable)new UnknownKvStateLocation(query1));
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query1))).thenReturn((Object)unknownKvStateLocation);
            Future result = client.getKvState(jobId, query1, 0, new byte[0]);
            try {
                Await.result((Awaitable)result, (Duration)timeout);
                Assert.fail((String)"Did not throw expected UnknownKvStateLocation exception");
            }
            catch (UnknownKvStateLocation unknownKvStateLocation2) {
                // empty catch block
            }
            ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)2))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query1));
            String query2 = "unlucky";
            Future unknownKeyGroupLocation = Futures.successful((Object)new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2));
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query2))).thenReturn((Object)unknownKeyGroupLocation);
            result = client.getKvState(jobId, query2, 0, new byte[0]);
            try {
                Await.result((Awaitable)result, (Duration)timeout);
                Assert.fail((String)"Did not throw expected UnknownKvStateKeyGroupLocation exception");
            }
            catch (UnknownKvStateKeyGroupLocation unknownKvStateKeyGroupLocation) {
                // empty catch block
            }
            ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)2))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query2));
            String query3 = "water";
            KvStateID kvStateId = new KvStateID();
            Future unknownKvStateId = Futures.failed((Throwable)new UnknownKvStateID(kvStateId));
            KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
            KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
            for (int i = 0; i < numKeyGroups; ++i) {
                location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
            }
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query3))).thenReturn((Object)Futures.successful((Object)location));
            Mockito.when((Object)networkClient.getKvState((KvStateServerAddress)Matchers.eq((Object)serverAddress), (KvStateID)Matchers.eq((Object)kvStateId), (byte[])Matchers.any(byte[].class))).thenReturn((Object)unknownKvStateId);
            result = client.getKvState(jobId, query3, 0, new byte[0]);
            try {
                Await.result((Awaitable)result, (Duration)timeout);
                Assert.fail((String)"Did not throw expected UnknownKvStateID exception");
            }
            catch (UnknownKvStateID i) {
                // empty catch block
            }
            ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)2))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query3));
            String query4 = "space";
            Future connectException = Futures.failed((Throwable)new ConnectException());
            kvStateId = new KvStateID();
            serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
            location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
            for (int i = 0; i < numKeyGroups; ++i) {
                location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
            }
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query4))).thenReturn((Object)Futures.successful((Object)location));
            Mockito.when((Object)networkClient.getKvState((KvStateServerAddress)Matchers.eq((Object)serverAddress), (KvStateID)Matchers.eq((Object)kvStateId), (byte[])Matchers.any(byte[].class))).thenReturn((Object)connectException);
            result = client.getKvState(jobId, query4, 0, new byte[0]);
            try {
                Await.result((Awaitable)result, (Duration)timeout);
                Assert.fail((String)"Did not throw expected ConnectException exception");
            }
            catch (ConnectException i) {
                // empty catch block
            }
            ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)2))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query4));
            String query5 = "universe";
            Future exception = Futures.failed((Throwable)new RuntimeException("Test exception"));
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query5))).thenReturn((Object)exception);
            client.getKvState(jobId, query5, 0, new byte[0]);
            ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)1))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)query5));
        }
        finally {
            client.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIntegrationWithKvStateServer() throws Exception {
        int numServers = 2;
        int numKeys = 1024;
        int numKeyGroups = 1;
        JobID jobId = new JobID();
        JobVertexID jobVertexId = new JobVertexID();
        KvStateServer[] servers = new KvStateServer[numServers];
        AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
        QueryableStateClient client = null;
        KvStateClient networkClient = null;
        AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats();
        MemoryStateBackend backend = new MemoryStateBackend();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        AbstractKeyedStateBackend keyedStateBackend = backend.createKeyedStateBackend((Environment)dummyEnv, new JobID(), "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
        try {
            HeapValueState kvState;
            KvStateRegistry[] registries = new KvStateRegistry[numServers];
            KvStateID[] kvStateIds = new KvStateID[numServers];
            ArrayList<HeapValueState> kvStates = new ArrayList<HeapValueState>();
            for (int i = 0; i < numServers; ++i) {
                registries[i] = new KvStateRegistry();
                serverStats[i] = new AtomicKvStateRequestStats();
                servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], (KvStateRequestStats)serverStats[i]);
                servers[i].start();
                ValueStateDescriptor descriptor = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
                RegisteredKeyedBackendStateMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(descriptor.getType(), descriptor.getName(), (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (TypeSerializer)IntSerializer.INSTANCE);
                kvState = new HeapValueState(descriptor, (StateTable)new NestedMapsStateTable((InternalKeyContext)keyedStateBackend, registeredKeyedBackendStateMetaInfo), (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
                kvStates.add(kvState);
                kvStateIds[i] = registries[i].registerKvState(jobId, new JobVertexID(), new KeyGroupRange(i, i), "choco", (InternalKvState)kvState);
            }
            int[] expectedRequests = new int[numServers];
            for (int key = 0; key < numKeys; ++key) {
                int targetKeyGroupIndex;
                int n = targetKeyGroupIndex = MathUtils.murmurHash((int)key) % numServers;
                expectedRequests[n] = expectedRequests[n] + 1;
                kvState = (HeapValueState)kvStates.get(targetKeyGroupIndex);
                keyedStateBackend.setCurrentKey((Object)key);
                kvState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
                kvState.update((Object)(1337 + key));
            }
            KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
            for (int keyGroupIndex = 0; keyGroupIndex < numServers; ++keyGroupIndex) {
                location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
            }
            KvStateLocationLookupService lookupService = (KvStateLocationLookupService)Mockito.mock(KvStateLocationLookupService.class);
            Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId), (String)Matchers.eq((Object)"choco"))).thenReturn((Object)Futures.successful((Object)location));
            networkClient = new KvStateClient(1, (KvStateRequestStats)networkClientStats);
            client = new QueryableStateClient(lookupService, networkClient, (ExecutionContext)testActorSystem.dispatcher());
            ArrayList<Future> futures = new ArrayList<Future>(numKeys);
            for (int key = 0; key < numKeys; ++key) {
                byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
                futures.add(client.getKvState(jobId, "choco", key, serializedKeyAndNamespace));
            }
            Future future = Futures.sequence(futures, (ExecutionContext)testActorSystem.dispatcher());
            Iterable results = (Iterable)Await.result((Awaitable)future, (Duration)timeout);
            int index = 0;
            for (byte[] buffer : results) {
                int deserializedValue = (Integer)KvStateRequestSerializer.deserializeValue((byte[])buffer, (TypeSerializer)IntSerializer.INSTANCE);
                Assert.assertEquals((long)(1337 + index), (long)deserializedValue);
                ++index;
            }
            block10: for (int i = 0; i < numServers; ++i) {
                int numRetries = 10;
                for (int retry = 0; retry < numRetries; ++retry) {
                    try {
                        Assert.assertEquals((String)"Unexpected number of requests", (long)expectedRequests[i], (long)serverStats[i].getNumRequests());
                        Assert.assertEquals((String)"Unexpected success requests", (long)expectedRequests[i], (long)serverStats[i].getNumSuccessful());
                        Assert.assertEquals((String)"Unexpected failed requests", (long)0L, (long)serverStats[i].getNumFailed());
                        continue block10;
                    }
                    catch (Throwable t) {
                        if (retry == numRetries - 1) {
                            throw t;
                        }
                        Thread.sleep(100L);
                        continue;
                    }
                }
            }
        }
        finally {
            if (client != null) {
                client.shutDown();
            }
            if (networkClient != null) {
                networkClient.shutDown();
            }
            for (KvStateServer server : servers) {
                if (server == null) continue;
                server.shutDown();
            }
        }
    }

    @Test
    public void testLookupMultipleJobIds() throws Exception {
        String name = "unique-per-job";
        KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
        location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        KvStateLocationLookupService lookupService = (KvStateLocationLookupService)Mockito.mock(KvStateLocationLookupService.class);
        Mockito.when((Object)lookupService.getKvStateLookupInfo((JobID)Matchers.any(JobID.class), Matchers.anyString())).thenReturn((Object)Futures.successful((Object)location));
        KvStateClient networkClient = (KvStateClient)Mockito.mock(KvStateClient.class);
        Mockito.when((Object)networkClient.getKvState((KvStateServerAddress)Matchers.any(KvStateServerAddress.class), (KvStateID)Matchers.any(KvStateID.class), (byte[])Matchers.any(byte[].class))).thenReturn((Object)Futures.successful((Object)new byte[0]));
        QueryableStateClient client = new QueryableStateClient(lookupService, networkClient, (ExecutionContext)testActorSystem.dispatcher());
        client.getKvState(jobId1, name, 0, new byte[0]);
        client.getKvState(jobId2, name, 0, new byte[0]);
        ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)1))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId1), (String)Matchers.eq((Object)name));
        ((KvStateLocationLookupService)Mockito.verify((Object)lookupService, (VerificationMode)Mockito.times((int)1))).getKvStateLookupInfo((JobID)Matchers.eq((Object)jobId2), (String)Matchers.eq((Object)name));
    }
}

