/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.UnknownJobManager;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class AkkaKvStateLocationLookupServiceTest
extends TestLogger {
    private static final FiniteDuration TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    private static ActorSystem testActorSystem;

    @BeforeClass
    public static void setUp() throws Exception {
        testActorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (testActorSystem != null) {
            testActorSystem.shutdown();
        }
    }

    @Test
    public void testNoJobManagerRegistered() throws Exception {
        TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        LinkedBlockingQueue received = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService((LeaderRetrievalService)leaderRetrievalService, testActorSystem, TIMEOUT, (AkkaKvStateLocationLookupService.LookupRetryStrategyFactory)new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        lookupService.start();
        try {
            JobID jobId = new JobID();
            String name = "coffee";
            Future locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
            Await.result((Awaitable)locationFuture, (Duration)TIMEOUT);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (UnknownJobManager jobId) {
            // empty catch block
        }
        Assert.assertEquals((String)"Received unexpected lookup", (long)0L, (long)received.size());
        UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
        KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
        ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, new Object[]{expected});
        String testActorAddress = AkkaUtils.getAkkaURL((ActorSystem)testActorSystem, (ActorRef)testActor);
        leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
        JobID jobId = new JobID();
        String name = "tea";
        KvStateLocation location = (KvStateLocation)Await.result((Awaitable)lookupService.getKvStateLookupInfo(jobId, name), (Duration)TIMEOUT);
        Assert.assertEquals((Object)expected, (Object)location);
        Assert.assertEquals((long)1L, (long)received.size());
        AkkaKvStateLocationLookupServiceTest.verifyLookupMsg((KvStateMessage.LookupKvStateLocation)received.poll(), jobId, name);
        leaderRetrievalService.notifyListener(null, null);
        try {
            Future locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "coffee");
            Await.result((Awaitable)locationFuture, (Duration)TIMEOUT);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (UnknownJobManager unknownJobManager) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)received.size());
    }

    @Test
    public void testLeaderSessionIdChange() throws Exception {
        TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        LinkedBlockingQueue received = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService((LeaderRetrievalService)leaderRetrievalService, testActorSystem, TIMEOUT, (AkkaKvStateLocationLookupService.LookupRetryStrategyFactory)new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        lookupService.start();
        KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
        UUID leaderSessionId1 = UUID.randomUUID();
        ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, new Object[]{expected1});
        String testActorAddress1 = AkkaUtils.getAkkaURL((ActorSystem)testActorSystem, (ActorRef)testActor1);
        KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
        UUID leaderSessionId2 = UUID.randomUUID();
        ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, new Object[]{expected2});
        String testActorAddress2 = AkkaUtils.getAkkaURL((ActorSystem)testActorSystem, (ActorRef)testActor2);
        JobID jobId = new JobID();
        leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
        KvStateLocation location = (KvStateLocation)Await.result((Awaitable)lookupService.getKvStateLookupInfo(jobId, "rock"), (Duration)TIMEOUT);
        Assert.assertEquals((Object)expected1, (Object)location);
        Assert.assertEquals((long)1L, (long)received.size());
        AkkaKvStateLocationLookupServiceTest.verifyLookupMsg((KvStateMessage.LookupKvStateLocation)received.poll(), jobId, "rock");
        leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
        location = (KvStateLocation)Await.result((Awaitable)lookupService.getKvStateLookupInfo(jobId, "roll"), (Duration)TIMEOUT);
        Assert.assertEquals((Object)expected2, (Object)location);
        Assert.assertEquals((long)1L, (long)received.size());
        AkkaKvStateLocationLookupServiceTest.verifyLookupMsg((KvStateMessage.LookupKvStateLocation)received.poll(), jobId, "roll");
    }

    @Test
    public void testRetryOnUnknownJobManager() throws Exception {
        final LinkedBlockingQueue<Object> retryStrategies = new LinkedBlockingQueue<Object>();
        AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy = new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory(){

            public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
                return (AkkaKvStateLocationLookupService.LookupRetryStrategy)retryStrategies.poll();
            }
        };
        final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService((LeaderRetrievalService)leaderRetrievalService, testActorSystem, TIMEOUT, retryStrategy);
        lookupService.start();
        final AtomicBoolean hasRetried = new AtomicBoolean();
        retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy(){

            public FiniteDuration getRetryDelay() {
                return FiniteDuration.Zero();
            }

            public boolean tryRetry() {
                return hasRetried.compareAndSet(false, true);
            }
        });
        Future locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
        Await.ready((Awaitable)locationFuture, (Duration)TIMEOUT);
        Assert.assertTrue((String)"Did not retry ", (boolean)hasRetried.get());
        LinkedBlockingQueue received = new LinkedBlockingQueue();
        KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
        ActorRef testActor = LookupResponseActor.create(received, null, new Object[]{expected});
        final String testActorAddress = AkkaUtils.getAkkaURL((ActorSystem)testActorSystem, (ActorRef)testActor);
        retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy(){

            public FiniteDuration getRetryDelay() {
                return FiniteDuration.apply((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
            }

            public boolean tryRetry() {
                leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
                return true;
            }
        });
        KvStateLocation location = (KvStateLocation)Await.result((Awaitable)lookupService.getKvStateLookupInfo(new JobID(), "yessir"), (Duration)TIMEOUT);
        Assert.assertEquals((Object)expected, (Object)location);
    }

    @Test
    public void testUnexpectedResponseType() throws Exception {
        TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
        LinkedBlockingQueue received = new LinkedBlockingQueue();
        AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService((LeaderRetrievalService)leaderRetrievalService, testActorSystem, TIMEOUT, (AkkaKvStateLocationLookupService.LookupRetryStrategyFactory)new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
        lookupService.start();
        String expected = "unexpected-response-type";
        ActorRef testActor = LookupResponseActor.create(received, null, new Object[]{expected});
        String testActorAddress = AkkaUtils.getAkkaURL((ActorSystem)testActorSystem, (ActorRef)testActor);
        leaderRetrievalService.notifyListener(testActorAddress, null);
        try {
            Await.result((Awaitable)lookupService.getKvStateLookupInfo(new JobID(), "spicy"), (Duration)TIMEOUT);
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    private static void verifyLookupMsg(KvStateMessage.LookupKvStateLocation lookUpMsg, JobID expectedJobId, String expectedName) {
        Assert.assertNotNull((Object)lookUpMsg);
        Assert.assertEquals((Object)expectedJobId, (Object)lookUpMsg.getJobId());
        Assert.assertEquals((Object)expectedName, (Object)lookUpMsg.getRegistrationName());
    }

    private static final class LookupResponseActor
    extends FlinkUntypedActor {
        private final Queue<KvStateMessage.LookupKvStateLocation> receivedLookups;
        private final Queue<Object> lookupResponses;
        private UUID leaderSessionId;

        public LookupResponseActor(Queue<KvStateMessage.LookupKvStateLocation> receivedLookups, UUID leaderSessionId, Object ... lookupResponses) {
            this.receivedLookups = (Queue)Preconditions.checkNotNull(receivedLookups, (String)"Received lookups");
            this.leaderSessionId = leaderSessionId;
            this.lookupResponses = new ArrayDeque<Object>();
            if (lookupResponses != null) {
                for (Object resp : lookupResponses) {
                    this.lookupResponses.add(resp);
                }
            }
        }

        public void handleMessage(Object message) throws Exception {
            if (message instanceof KvStateMessage.LookupKvStateLocation) {
                this.receivedLookups.add((KvStateMessage.LookupKvStateLocation)message);
                Object msg = this.lookupResponses.poll();
                if (msg != null) {
                    if (msg instanceof Throwable) {
                        this.sender().tell((Object)new Status.Failure((Throwable)msg), this.self());
                    } else {
                        this.sender().tell((Object)new Status.Success(msg), this.self());
                    }
                }
            } else if (message instanceof UUID) {
                this.leaderSessionId = (UUID)message;
            } else {
                this.LOG.debug("Received unhandled message: {}", message);
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionId;
        }

        private static ActorRef create(Queue<KvStateMessage.LookupKvStateLocation> receivedLookups, UUID leaderSessionId, Object ... lookupResponses) {
            return testActorSystem.actorOf(Props.create(LookupResponseActor.class, (Object[])new Object[]{receivedLookups, leaderSessionId, lookupResponses}));
        }
    }
}

