/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.typesafe.config.Config;
import java.io.File;
import java.net.InetAddress;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

public class JobManagerTest
extends TestLogger {
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static ActorSystem system;
    private HighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new EmbeddedHaServices((Executor)TestingUtils.defaultExecutor());
    }

    @After
    public void tearDownTest() throws Exception {
        this.highAvailabilityServices.closeAndCleanupAllData();
        this.highAvailabilityServices = null;
    }

    @Test
    public void testNullHostnameGoesToLocalhost() {
        try {
            Tuple2 address = new Tuple2(null, (Object)1772);
            Config cfg = AkkaUtils.getAkkaConfig((Configuration)new Configuration(), (Option)new Some((Object)address));
            String hostname = cfg.getString("akka.remote.netty.tcp.hostname");
            Assert.assertTrue((boolean)InetAddress.getByName(hostname).isLoopbackAddress());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        new JavaTestKit(system){
            {
                new JavaTestKit.Within(1.duration((String)"15 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        try {
                            cluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            IntermediateDataSetID rid = new IntermediateDataSetID();
                            JobVertex sender = new JobVertex("Sender");
                            sender.setParallelism(1);
                            sender.setInvokableClass(BlockingNoOpInvokable.class);
                            sender.createAndAddResultDataSet(rid, ResultPartitionType.PIPELINED);
                            JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{sender});
                            JobID jid = jobGraph.getJobID();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            ExecutionAttemptID receiver = new ExecutionAttemptID();
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.RequestExecutionGraph(jid), (ActorGateway)testActorGateway);
                            ExecutionGraph eg = (ExecutionGraph)((TestingJobManagerMessages.ExecutionGraphFound)this.expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class)).executionGraph();
                            ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
                            IntermediateResultPartition partition = (IntermediateResultPartition)vertex.getProducedPartitions().values().iterator().next();
                            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), vertex.getCurrentExecutionAttempt().getAttemptId());
                            JobManagerMessages.RequestPartitionProducerState request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
                            for (ExecutionState state : ExecutionState.values()) {
                                ExecutionGraphTestUtils.setVertexState(vertex, state);
                                Future futurePartitionState = jobManagerGateway.ask((Object)request, this.getRemainingTime()).mapTo(ClassTag$.MODULE$.apply(ExecutionState.class));
                                ExecutionState resp = (ExecutionState)Await.result((Awaitable)futurePartitionState, (Duration)this.getRemainingTime());
                                Assert.assertEquals((Object)state, (Object)resp);
                            }
                            request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, new ResultPartitionID());
                            Future futurePartitionState = jobManagerGateway.ask((Object)request, this.getRemainingTime());
                            try {
                                Await.result((Awaitable)futurePartitionState, (Duration)this.getRemainingTime());
                                Assert.fail((String)"Did not fail with expected RuntimeException");
                            }
                            catch (RuntimeException e) {
                                Assert.assertEquals(IllegalArgumentException.class, e.getCause().getClass());
                            }
                            request = new JobManagerMessages.RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
                            futurePartitionState = jobManagerGateway.ask((Object)request, this.getRemainingTime());
                            try {
                                Await.result((Awaitable)futurePartitionState, (Duration)this.getRemainingTime());
                                Assert.fail((String)"Did not fail with expected IllegalArgumentException");
                            }
                            catch (IllegalArgumentException illegalArgumentException) {
                                // empty catch block
                            }
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail((String)e.getMessage());
                        }
                        finally {
                            if (cluster != null) {
                                cluster.shutdown();
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateUnregisteredExecution() throws Exception {
        new JavaTestKit(system){
            {
                new JavaTestKit.Within(2.duration((String)"15 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        try {
                            cluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            IntermediateDataSetID rid = new IntermediateDataSetID();
                            JobVertex sender = new JobVertex("Sender");
                            sender.setParallelism(1);
                            sender.setInvokableClass(NoOpInvokable.class);
                            sender.createAndAddResultDataSet(rid, ResultPartitionType.PIPELINED);
                            JobVertex sender2 = new JobVertex("Blocking Sender");
                            sender2.setParallelism(1);
                            sender2.setInvokableClass(BlockingNoOpInvokable.class);
                            sender2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                            JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{sender, sender2});
                            JobID jid = jobGraph.getJobID();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            Future egFuture = jobManagerGateway.ask((Object)new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), this.remaining());
                            TestingJobManagerMessages.ExecutionGraphFound egFound = (TestingJobManagerMessages.ExecutionGraphFound)Await.result((Awaitable)egFuture, (Duration)this.remaining());
                            ExecutionGraph eg = (ExecutionGraph)egFound.executionGraph();
                            ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
                            while (vertex.getExecutionState() != ExecutionState.FINISHED) {
                                Thread.sleep(1L);
                            }
                            IntermediateResultPartition partition = (IntermediateResultPartition)vertex.getProducedPartitions().values().iterator().next();
                            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), vertex.getCurrentExecutionAttempt().getAttemptId());
                            JobManagerMessages.RequestPartitionProducerState request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
                            Future producerStateFuture = jobManagerGateway.ask((Object)request, this.getRemainingTime()).mapTo(ClassTag$.MODULE$.apply(ExecutionState.class));
                            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)Await.result((Awaitable)producerStateFuture, (Duration)this.getRemainingTime()));
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail((String)e.getMessage());
                        }
                        finally {
                            if (cluster != null) {
                                cluster.shutdown();
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
        new JavaTestKit(system){
            {
                new JavaTestKit.Within(3.duration((String)"15 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        try {
                            cluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            IntermediateDataSetID rid = new IntermediateDataSetID();
                            JobVertex sender = new JobVertex("Sender");
                            sender.setParallelism(1);
                            sender.setInvokableClass(NoOpInvokable.class);
                            sender.createAndAddResultDataSet(rid, ResultPartitionType.PIPELINED);
                            JobVertex sender2 = new JobVertex("Blocking Sender");
                            sender2.setParallelism(1);
                            sender2.setInvokableClass(BlockingNoOpInvokable.class);
                            sender2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                            JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{sender, sender2});
                            JobID jid = jobGraph.getJobID();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            Future egFuture = jobManagerGateway.ask((Object)new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), this.remaining());
                            TestingJobManagerMessages.ExecutionGraphFound egFound = (TestingJobManagerMessages.ExecutionGraphFound)Await.result((Awaitable)egFuture, (Duration)this.remaining());
                            ExecutionGraph eg = (ExecutionGraph)egFound.executionGraph();
                            ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
                            while (vertex.getExecutionState() != ExecutionState.FINISHED) {
                                Thread.sleep(1L);
                            }
                            IntermediateResultPartition partition = (IntermediateResultPartition)vertex.getProducedPartitions().values().iterator().next();
                            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), vertex.getCurrentExecutionAttempt().getAttemptId());
                            vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
                            JobManagerMessages.RequestPartitionProducerState request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
                            Future producerStateFuture = jobManagerGateway.ask((Object)request, this.getRemainingTime());
                            try {
                                Await.result((Awaitable)producerStateFuture, (Duration)this.getRemainingTime());
                                Assert.fail((String)"Did not fail with expected Exception");
                            }
                            catch (PartitionProducerDisposedException partitionProducerDisposedException) {
                                // empty catch block
                            }
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail((String)e.getMessage());
                        }
                        finally {
                            if (cluster != null) {
                                cluster.shutdown();
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignal() throws Exception {
        new JavaTestKit(system){
            {
                new JavaTestKit.Within(4.duration((String)"15 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        try {
                            cluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex sender = new JobVertex("Sender");
                            sender.setParallelism(2);
                            sender.setInvokableClass(StoppableInvokable.class);
                            JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{sender});
                            JobID jid = jobGraph.getJobID();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            jobManagerGateway.tell((Object)new JobManagerMessages.StopJob(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.StoppingSuccess.class);
                            this.expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                        }
                        finally {
                            if (cluster != null) {
                                cluster.shutdown();
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignalFail() throws Exception {
        new JavaTestKit(system){
            {
                new JavaTestKit.Within(5.duration((String)"15 seconds")){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    protected void run() {
                        TestingCluster cluster = null;
                        try {
                            cluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex sender = new JobVertex("Sender");
                            sender.setParallelism(1);
                            sender.setInvokableClass(BlockingNoOpInvokable.class);
                            JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new JobVertex[]{sender});
                            JobID jid = jobGraph.getJobID();
                            ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway testActorGateway = new AkkaActorGateway(this.getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            jobManagerGateway.tell((Object)new JobManagerMessages.StopJob(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(JobManagerMessages.StoppingFailure.class);
                            jobManagerGateway.tell((Object)new TestingJobManagerMessages.RequestExecutionGraph(jid), (ActorGateway)testActorGateway);
                            this.expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                        }
                        finally {
                            if (cluster != null) {
                                cluster.shutdown();
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testKvStateMessages() throws Exception {
        Deadline deadline = new FiniteDuration(100L, TimeUnit.SECONDS).fromNow();
        Configuration config = new Configuration();
        config.setString("akka.ask.timeout", "100ms");
        ActorRef jobManagerActor = (ActorRef)JobManager.startJobManagerActors((Configuration)config, (ActorSystem)system, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)this.highAvailabilityServices, TestingJobManager.class, MemoryArchivist.class)._1();
        UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)TestingUtils.TESTING_TIMEOUT());
        AkkaActorGateway jobManager = new AkkaActorGateway(jobManagerActor, leaderId);
        Configuration tmConfig = new Configuration();
        tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
        tmConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
        ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor((Configuration)tmConfig, (ResourceID)ResourceID.generate(), (ActorSystem)system, (HighAvailabilityServices)this.highAvailabilityServices, (String)"localhost", (Option)Option.empty(), (boolean)true, TestingTaskManager.class);
        Future registrationFuture = jobManager.ask((Object)new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
        Await.ready((Awaitable)registrationFuture, (Duration)deadline.timeLeft());
        KvStateMessage.LookupKvStateLocation lookupNonExistingJob = new KvStateMessage.LookupKvStateLocation(new JobID(), "any-name");
        Future lookupFuture = jobManager.ask((Object)lookupNonExistingJob, deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class));
        try {
            Await.result((Awaitable)lookupFuture, (Duration)deadline.timeLeft());
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        JobGraph jobGraph = new JobGraph("croissant");
        JobVertex jobVertex1 = new JobVertex("cappuccino");
        jobVertex1.setParallelism(4);
        jobVertex1.setMaxParallelism(16);
        jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("americano");
        jobVertex2.setParallelism(4);
        jobVertex2.setMaxParallelism(16);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        jobGraph.addVertex(jobVertex1);
        jobGraph.addVertex(jobVertex2);
        Future submitFuture = jobManager.ask((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobSubmitSuccess.class));
        Await.result((Awaitable)submitFuture, (Duration)deadline.timeLeft());
        KvStateMessage.LookupKvStateLocation lookupUnknownRegistrationName = new KvStateMessage.LookupKvStateLocation(jobGraph.getJobID(), "unknown");
        lookupFuture = jobManager.ask((Object)lookupUnknownRegistrationName, deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class));
        try {
            Await.result((Awaitable)lookupFuture, (Duration)deadline.timeLeft());
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (UnknownKvStateLocation unknownKvStateLocation) {
            // empty catch block
        }
        KvStateMessage.NotifyKvStateRegistered registerNonExistingJob = new KvStateMessage.NotifyKvStateRegistered(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any-name", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
        jobManager.tell((Object)registerNonExistingJob);
        KvStateMessage.LookupKvStateLocation lookupAfterRegistration = new KvStateMessage.LookupKvStateLocation(registerNonExistingJob.getJobId(), registerNonExistingJob.getRegistrationName());
        lookupFuture = jobManager.ask((Object)lookupAfterRegistration, deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class));
        try {
            Await.result((Awaitable)lookupFuture, (Duration)deadline.timeLeft());
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        KvStateMessage.NotifyKvStateRegistered registerForExistingJob = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex1.getID(), new KeyGroupRange(0, 0), "register-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        jobManager.tell((Object)registerForExistingJob);
        lookupAfterRegistration = new KvStateMessage.LookupKvStateLocation(registerForExistingJob.getJobId(), registerForExistingJob.getRegistrationName());
        lookupFuture = jobManager.ask((Object)lookupAfterRegistration, deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class));
        KvStateLocation location = (KvStateLocation)Await.result((Awaitable)lookupFuture, (Duration)deadline.timeLeft());
        Assert.assertNotNull((Object)location);
        Assert.assertEquals((Object)jobGraph.getJobID(), (Object)location.getJobId());
        Assert.assertEquals((Object)jobVertex1.getID(), (Object)location.getJobVertexId());
        Assert.assertEquals((long)jobVertex1.getMaxParallelism(), (long)location.getNumKeyGroups());
        Assert.assertEquals((long)1L, (long)location.getNumRegisteredKeyGroups());
        KeyGroupRange keyGroupRange = registerForExistingJob.getKeyGroupRange();
        Assert.assertEquals((long)1L, (long)keyGroupRange.getNumberOfKeyGroups());
        Assert.assertEquals((Object)registerForExistingJob.getKvStateId(), (Object)location.getKvStateID(keyGroupRange.getStartKeyGroup()));
        Assert.assertEquals((Object)registerForExistingJob.getKvStateServerAddress(), (Object)location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
        KvStateMessage.NotifyKvStateUnregistered unregister = new KvStateMessage.NotifyKvStateUnregistered(registerForExistingJob.getJobId(), registerForExistingJob.getJobVertexId(), registerForExistingJob.getKeyGroupRange(), registerForExistingJob.getRegistrationName());
        jobManager.tell((Object)unregister);
        lookupFuture = jobManager.ask((Object)lookupAfterRegistration, deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class));
        try {
            Await.result((Awaitable)lookupFuture, (Duration)deadline.timeLeft());
            Assert.fail((String)"Did not throw expected Exception");
        }
        catch (UnknownKvStateLocation unknownKvStateLocation) {
            // empty catch block
        }
        KvStateMessage.NotifyKvStateRegistered register = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex1.getID(), new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        KvStateMessage.NotifyKvStateRegistered duplicate = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex2.getID(), new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        Future failedFuture = jobManager.ask((Object)new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft()).mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class));
        jobManager.tell((Object)register);
        jobManager.tell((Object)duplicate);
        TestingJobManagerMessages.JobStatusIs jobStatus = (TestingJobManagerMessages.JobStatusIs)Await.result((Awaitable)failedFuture, (Duration)deadline.timeLeft());
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)jobStatus.state());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelWithSavepoint() throws Exception {
        File defaultSavepointDir = this.tmpFolder.newFolder();
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration config = new Configuration();
        config.setString("state.savepoints.dir", defaultSavepointDir.getAbsolutePath());
        ActorSystem actorSystem = null;
        AkkaActorGateway jobManager = null;
        AkkaActorGateway archiver = null;
        AkkaActorGateway taskManager = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
            Tuple2 master = JobManager.startJobManagerActors((Configuration)config, (ActorSystem)actorSystem, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)this.highAvailabilityServices, (Option)Option.apply((Object)"jm"), (Option)Option.apply((Object)"arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)TestingUtils.TESTING_TIMEOUT());
            jobManager = new AkkaActorGateway((ActorRef)master._1(), leaderId);
            archiver = new AkkaActorGateway((ActorRef)master._2(), leaderId);
            ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor((Configuration)config, (ResourceID)ResourceID.generate(), (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (String)"localhost", (Option)Option.apply((Object)"tm"), (boolean)true, TestingTaskManager.class);
            taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
            Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
            Await.ready((Awaitable)taskManager.ask(msg, timeout), (Duration)timeout);
            JobVertex sourceVertex = new JobVertex("Source");
            sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            sourceVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceVertex});
            JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), 3600000L, 3600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true);
            jobGraph.setSnapshotSettings(snapshottingSettings);
            msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
            Future cancelled = jobManager.ask(msg, timeout);
            String savepointPath = null;
            for (int i = 0; i < 10; ++i) {
                msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
                JobManagerMessages.CancellationResponse cancelResp = (JobManagerMessages.CancellationResponse)Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
                if (cancelResp instanceof JobManagerMessages.CancellationFailure) {
                    JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure)cancelResp;
                    if (failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) {
                        Thread.sleep(200L);
                        continue;
                    }
                    failure.cause().printStackTrace();
                    Assert.fail((String)("Failed to cancel job: " + failure.cause().getMessage()));
                    continue;
                }
                savepointPath = ((JobManagerMessages.CancellationSuccess)cancelResp).savepointPath();
                break;
            }
            Assert.assertNotEquals((String)"Savepoint not triggered", null, savepointPath);
            Await.ready((Awaitable)cancelled, (Duration)timeout);
            File savepointFile = new File(savepointPath);
            Assert.assertEquals((Object)true, (Object)savepointFile.exists());
        }
        finally {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (archiver != null) {
                archiver.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination((Duration)TestingUtils.TESTING_TIMEOUT());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception {
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration config = new Configuration();
        ActorSystem actorSystem = null;
        AkkaActorGateway jobManager = null;
        AkkaActorGateway archiver = null;
        AkkaActorGateway taskManager = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
            Tuple2 master = JobManager.startJobManagerActors((Configuration)config, (ActorSystem)actorSystem, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)this.highAvailabilityServices, (Option)Option.apply((Object)"jm"), (Option)Option.apply((Object)"arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)TestingUtils.TESTING_TIMEOUT());
            jobManager = new AkkaActorGateway((ActorRef)master._1(), leaderId);
            archiver = new AkkaActorGateway((ActorRef)master._2(), leaderId);
            ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor((Configuration)config, (ResourceID)ResourceID.generate(), (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (String)"localhost", (Option)Option.apply((Object)"tm"), (boolean)true, TestingTaskManager.class);
            taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
            Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
            Await.ready((Awaitable)taskManager.ask(msg, timeout), (Duration)timeout);
            JobVertex sourceVertex = new JobVertex("Source");
            sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            sourceVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceVertex});
            JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), 3600000L, 3600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true);
            jobGraph.setSnapshotSettings(snapshottingSettings);
            msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
            JobManagerMessages.CancellationResponse cancelResp = (JobManagerMessages.CancellationResponse)Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            if (cancelResp instanceof JobManagerMessages.CancellationFailure) {
                JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure)cancelResp;
                Assert.assertTrue((boolean)(failure.cause() instanceof IllegalStateException));
                Assert.assertTrue((boolean)failure.cause().getMessage().contains("savepoint directory"));
            } else {
                Assert.fail((String)("Unexpected cancellation response from JobManager: " + cancelResp));
            }
        }
        finally {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (archiver != null) {
                archiver.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
        File defaultSavepointDir = this.tmpFolder.newFolder();
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration config = new Configuration();
        config.setString("state.savepoints.dir", defaultSavepointDir.getAbsolutePath());
        ActorSystem actorSystem = null;
        AkkaActorGateway jobManager = null;
        AkkaActorGateway archiver = null;
        AkkaActorGateway taskManager = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
            Tuple2 master = JobManager.startJobManagerActors((Configuration)config, (ActorSystem)actorSystem, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)this.highAvailabilityServices, (Option)Option.apply((Object)"jm"), (Option)Option.apply((Object)"arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)TestingUtils.TESTING_TIMEOUT());
            jobManager = new AkkaActorGateway((ActorRef)master._1(), leaderId);
            archiver = new AkkaActorGateway((ActorRef)master._2(), leaderId);
            ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor((Configuration)config, (ResourceID)ResourceID.generate(), (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (String)"localhost", (Option)Option.apply((Object)"tm"), (boolean)true, TestingTaskManager.class);
            taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
            Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
            Await.ready((Awaitable)taskManager.ask(msg, timeout), (Duration)timeout);
            JobVertex sourceVertex = new JobVertex("Source");
            sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            sourceVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceVertex});
            JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true);
            jobGraph.setSnapshotSettings(snapshottingSettings);
            msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            File targetDirectory = this.tmpFolder.newFolder();
            msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID(), Option.apply((Object)targetDirectory.getAbsolutePath()));
            Future future = jobManager.ask(msg, timeout);
            Object result = Await.result((Awaitable)future, (Duration)timeout);
            Assert.assertTrue((String)"Did not trigger savepoint", (boolean)(result instanceof JobManagerMessages.TriggerSavepointSuccess));
            Assert.assertEquals((long)1L, (long)targetDirectory.listFiles().length);
        }
        finally {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (archiver != null) {
                archiver.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination((Duration)TestingUtils.TESTING_TIMEOUT());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSavepointRestoreSettings() throws Exception {
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        ActorSystem actorSystem = null;
        AkkaActorGateway jobManager = null;
        AkkaActorGateway archiver = null;
        AkkaActorGateway taskManager = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
            Tuple2 master = JobManager.startJobManagerActors((Configuration)new Configuration(), (ActorSystem)actorSystem, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)this.highAvailabilityServices, (Option)Option.apply((Object)"jm"), (Option)Option.apply((Object)"arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId((LeaderRetrievalService)this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), (FiniteDuration)TestingUtils.TESTING_TIMEOUT());
            jobManager = new AkkaActorGateway((ActorRef)master._1(), leaderId);
            archiver = new AkkaActorGateway((ActorRef)master._2(), leaderId);
            Configuration tmConfig = new Configuration();
            tmConfig.setInteger("taskmanager.numberOfTaskSlots", 4);
            ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor((Configuration)tmConfig, (ResourceID)ResourceID.generate(), (ActorSystem)actorSystem, (HighAvailabilityServices)this.highAvailabilityServices, (String)"localhost", (Option)Option.apply((Object)"tm"), (boolean)true, TestingTaskManager.class);
            taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
            Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
            Await.ready((Awaitable)taskManager.ask(msg, timeout), (Duration)timeout);
            JobVertex sourceVertex = new JobVertex("Source");
            sourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            sourceVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceVertex});
            JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Collections.singletonList(sourceVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true);
            jobGraph.setSnapshotSettings(snapshottingSettings);
            msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
            Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            File targetDirectory = this.tmpFolder.newFolder();
            msg = new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID(), Option.apply((Object)targetDirectory.getAbsolutePath()));
            Future future = jobManager.ask(msg, timeout);
            Object result = Await.result((Awaitable)future, (Duration)timeout);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)result).savepointPath();
            msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
            Future removedFuture = jobManager.ask(msg, timeout);
            Future cancelFuture = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()), timeout);
            Object response = Await.result((Awaitable)cancelFuture, (Duration)timeout);
            Assert.assertTrue((String)("Unexpected response: " + response), (boolean)(response instanceof JobManagerMessages.CancellationSuccess));
            Await.ready((Awaitable)removedFuture, (Duration)timeout);
            JobVertex newSourceVertex = new JobVertex("NewSource");
            newSourceVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            newSourceVertex.setParallelism(1);
            JobGraph newJobGraph = new JobGraph("NewTestingJob", new JobVertex[]{newSourceVertex});
            JobCheckpointingSettings newSnapshottingSettings = new JobCheckpointingSettings(Collections.singletonList(newSourceVertex.getID()), Collections.singletonList(newSourceVertex.getID()), Collections.singletonList(newSourceVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true);
            newJobGraph.setSnapshotSettings(newSnapshottingSettings);
            SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false);
            newJobGraph.setSavepointRestoreSettings(restoreSettings);
            msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
            response = Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            Assert.assertTrue((String)("Unexpected response: " + response), (boolean)(response instanceof JobManagerMessages.JobResultFailure));
            JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure)response;
            Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
            Assert.assertTrue((boolean)(cause instanceof IllegalStateException));
            Assert.assertTrue((boolean)cause.getMessage().contains("allowNonRestoredState"));
            msg = new TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
            Await.ready((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            restoreSettings = SavepointRestoreSettings.forPath((String)savepointPath, (boolean)true);
            newJobGraph.setSavepointRestoreSettings(restoreSettings);
            msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
            response = Await.result((Awaitable)jobManager.ask(msg, timeout), (Duration)timeout);
            Assert.assertTrue((String)("Unexpected response: " + response), (boolean)(response instanceof JobManagerMessages.JobSubmitSuccess));
        }
        finally {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (archiver != null) {
                archiver.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.actor().tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination((Duration)TestingUtils.TESTING_TIMEOUT());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testResourceManagerConnection() throws TimeoutException, InterruptedException {
        FiniteDuration testTimeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        long reconnectionInterval = 200L;
        Configuration configuration = new Configuration();
        configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, 200L);
        ActorSystem actorSystem = AkkaUtils.createLocalActorSystem((Configuration)configuration);
        try {
            ActorGateway jmGateway = TestingUtils.createJobManager(actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), configuration, this.highAvailabilityServices);
            TestProbe probe = TestProbe.apply((ActorSystem)actorSystem);
            AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
            Future leaderFuture = jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout);
            Await.ready((Awaitable)leaderFuture, (Duration)testTimeout);
            jmGateway.tell((Object)new RegisterResourceManager(probe.ref()), (ActorGateway)rmGateway);
            JobManagerMessages.LeaderSessionMessage leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage)probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
            Assert.assertEquals((Object)jmGateway.leaderSessionID(), (Object)leaderSessionMessage.leaderSessionID());
            Assert.assertTrue((boolean)(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful));
            jmGateway.tell((Object)new RegistrationMessages.RegisterTaskManager(ResourceID.generate(), (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class), new HardwareDescription(1, 1L, 1L, 1L), 1));
            leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage)probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
            Assert.assertTrue((boolean)(leaderSessionMessage.message() instanceof NotifyResourceStarted));
            probe.lastSender().tell((Object)new Status.Failure((Throwable)new Exception("Test exception")), ActorRef.noSender());
            Deadline reconnectionDeadline = new FiniteDuration(1000L, TimeUnit.MILLISECONDS).fromNow();
            boolean registered = false;
            while (reconnectionDeadline.hasTimeLeft()) {
                try {
                    leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage)probe.expectMsgClass(reconnectionDeadline.timeLeft(), JobManagerMessages.LeaderSessionMessage.class);
                }
                catch (AssertionError ignored) {
                    continue;
                }
                if (leaderSessionMessage.message() instanceof TriggerRegistrationAtJobManager) {
                    if (registered) {
                        Assert.fail((String)"A successful registration should not be followed by another TriggerRegistrationAtJobManager message.");
                    }
                    jmGateway.tell((Object)new RegisterResourceManager(probe.ref()), (ActorGateway)rmGateway);
                    continue;
                }
                if (leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful) {
                    registered = true;
                    continue;
                }
                Assert.fail((String)("Received unknown message: " + leaderSessionMessage.message() + '.'));
            }
            Assert.assertTrue((boolean)registered);
        }
        finally {
            actorSystem.shutdown();
            actorSystem.awaitTermination();
        }
    }
}

