package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterService;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest.class */
public class DispatcherTest extends AbstractDispatcherTest {
    private JobGraph jobGraph;
    private JobID jobId;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private CountDownLatch createdJobManagerRunnerLatch;
    private TestingDispatcher dispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$BlockingJobVertex.class */
    public static class BlockingJobVertex extends JobVertex {
        private final OneShotLatch oneShotLatch;

        private BlockingJobVertex(String str) {
            super(str);
            this.oneShotLatch = new OneShotLatch();
        }

        public void initializeOnMaster(JobVertex.InitializeOnMasterContext initializeOnMasterContext) throws Exception {
            super.initializeOnMaster(initializeOnMasterContext);
            this.oneShotLatch.await();
        }

        public void unblock() {
            this.oneShotLatch.trigger();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$BlockingTerminationJobManagerService.class */
    private static final class BlockingTerminationJobManagerService extends JobMasterServiceLeadershipRunner {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future;

        public BlockingTerminationJobManagerService(JobID jobID, CompletableFuture<Void> completableFuture, JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElectionService leaderElectionService, JobResultStore jobResultStore, LibraryCacheManager.ClassLoaderLease classLoaderLease, FatalErrorHandler fatalErrorHandler) {
            super(jobMasterServiceProcessFactory, leaderElectionService, jobResultStore, classLoaderLease, fatalErrorHandler);
            this.future = completableFuture;
            this.jobIdToBlock = jobID;
        }

        public CompletableFuture<Void> closeAsync() {
            return this.jobIdToBlock.equals(getJobID()) ? this.future.whenComplete((r3, th) -> {
                super.closeAsync();
            }) : super.closeAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$ExpectedJobIdJobManagerRunnerFactory.class */
    public static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID jobID, CountDownLatch countDownLatch) {
            this.expectedJobId = jobID;
            this.createdJobManagerRunnerLatch = countDownLatch;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            Assert.assertEquals(this.expectedJobId, jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, j);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$FinishingJobManagerRunnerFactory.class */
    private static class FinishingJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final CompletableFuture<JobManagerRunnerResult> resultFuture;
        private final Runnable onClose;

        private FinishingJobManagerRunnerFactory(CompletableFuture<JobManagerRunnerResult> completableFuture, Runnable runnable) {
            this.resultFuture = completableFuture;
            this.onClose = runnable;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            TestingJobManagerRunner build = TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).setResultFuture(this.resultFuture).build();
            CompletableFuture<Void> terminationFuture = build.getTerminationFuture();
            Runnable runnable = this.onClose;
            runnable.getClass();
            terminationFuture.thenRun(runnable::run);
            return build;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$InitializationTimestampCapturingJobManagerRunnerFactory.class */
    private static final class InitializationTimestampCapturingJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final BlockingQueue<Long> initializationTimestampQueue;

        private InitializationTimestampCapturingJobManagerRunnerFactory(BlockingQueue<Long> blockingQueue) {
            this.initializationTimestampQueue = blockingQueue;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) {
            this.initializationTimestampQueue.offer(Long.valueOf(j));
            return TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).build();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$JobManagerRunnerWithBlockingJobMasterFactory.class */
    private static class JobManagerRunnerWithBlockingJobMasterFactory implements JobManagerRunnerFactory {
        private final JobMasterGateway jobMasterGateway;
        private final AtomicReference<JobStatus> currentJobStatus;
        private final BlockingQueue<CompletableFuture<JobMasterService>> jobMasterServiceFutures;
        private final OneShotLatch initLatch;

        private JobManagerRunnerWithBlockingJobMasterFactory() {
            this.currentJobStatus = new AtomicReference<>(JobStatus.INITIALIZING);
            this.jobMasterServiceFutures = new ArrayBlockingQueue(2);
            this.initLatch = new OneShotLatch();
            this.jobMasterGateway = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> {
                return CompletableFuture.completedFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(this.currentJobStatus.get()).build()));
            }).build();
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            return new JobMasterServiceLeadershipRunner(new DefaultJobMasterServiceProcessFactory(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getCheckpointingSettings(), j, new TestingJobMasterServiceFactory(() -> {
                this.initLatch.trigger();
                CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
                this.jobMasterServiceFutures.offer(completableFuture);
                return completableFuture;
            })), highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerSharedServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), fatalErrorHandler);
        }

        public void waitForBlockingInit() throws InterruptedException {
            this.initLatch.await();
        }

        public void unblockJobMasterInitialization() throws InterruptedException {
            this.jobMasterServiceFutures.take().complete(new TestingJobMasterService(this.jobMasterGateway));
            this.currentJobStatus.set(JobStatus.RUNNING);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$JobManagerRunnerWithBlockingTerminationFactory.class */
    private static final class JobManagerRunnerWithBlockingTerminationFactory implements JobManagerRunnerFactory {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future = new CompletableFuture<>();

        public JobManagerRunnerWithBlockingTerminationFactory(JobID jobID) {
            this.jobIdToBlock = jobID;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            return new BlockingTerminationJobManagerService(this.jobIdToBlock, this.future, new DefaultJobMasterServiceProcessFactory(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getCheckpointingSettings(), j, new TestingJobMasterServiceFactory()), highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerSharedServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), fatalErrorHandler);
        }

        public void unblockTermination() {
            this.future.complete(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$QueuedJobManagerRunnerFactory.class */
    private static class QueuedJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final Queue<JobManagerRunner> resultFutureQueue;

        private QueuedJobManagerRunnerFactory(JobManagerRunner... jobManagerRunnerArr) {
            this.resultFutureQueue = new ArrayDeque(Arrays.asList(jobManagerRunnerArr));
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            return this.resultFutureQueue.remove();
        }
    }

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(this.jobId, this.jobMasterLeaderElectionService);
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices testingHighAvailabilityServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher build = createTestingDispatcherBuilder().setHighAvailabilityServices(testingHighAvailabilityServices).setHeartbeatServices(heartbeatServices).setJobManagerRunnerFactory(jobManagerRunnerFactory).setJobGraphWriter(testingHighAvailabilityServices.getJobGraphStore()).setJobResultStore(testingHighAvailabilityServices.getJobResultStore()).build(rpcService);
        build.start();
        return build;
    }

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @After
    public void tearDown() throws Exception {
        if (this.dispatcher != null) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{this.dispatcher});
        }
        super.tearDown();
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElectionService.getStartFuture().get();
        Assert.assertTrue("jobManagerRunner was not started", this.jobMasterLeaderElectionService.getStartFuture().isDone());
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() throws Exception {
        this.haServices.getJobResultStore().createDirtyResult(new JobResultEntry(TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED)));
        assertDuplicateJobSubmission();
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws Exception {
        this.haServices.getJobResultStore().createDirtyResult(new JobResultEntry(TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED)));
        this.haServices.getJobResultStore().markResultAsClean(this.jobGraph.getJobID());
        assertDuplicateJobSubmission();
    }

    private void assertDuplicateJobSubmission() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        CompletableFuture submitJob = this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT);
        submitJob.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, submitJob::get);
        Assert.assertTrue(executionException.getCause() instanceof DuplicateJobSubmissionException);
        Assert.assertTrue(executionException.getCause().isGloballyTerminated());
    }

    @Test
    public void testDuplicateJobSubmissionWithRunningJobId() throws Exception {
        this.dispatcher = createTestingDispatcherBuilder().setJobManagerRunnerFactory(new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch)).setRecoveredJobs(Collections.singleton(this.jobGraph)).build(rpcService);
        this.dispatcher.start();
        CompletableFuture submitJob = this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT);
        submitJob.getClass();
        ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, submitJob::get);
        Assert.assertTrue(executionException.getCause() instanceof DuplicateJobSubmissionException);
        Assert.assertFalse(executionException.getCause().isGloballyTerminated());
    }

    @Test
    public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder(2.0d, 10).build();
        JobVertex jobVertex = new JobVertex("firstVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setResources(build, build);
        JobVertex jobVertex2 = new JobVertex("secondVertex");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        try {
            this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(streamingJobGraph, TIMEOUT).get();
            Assert.fail("job submission should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent());
        }
    }

    @Test
    public void testNonBlockingJobSubmission() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory jobManagerRunnerWithBlockingJobMasterFactory = new JobManagerRunnerWithBlockingJobMasterFactory();
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, jobManagerRunnerWithBlockingJobMasterFactory);
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        jobManagerRunnerWithBlockingJobMasterFactory.waitForBlockingInit();
        Assert.assertThat(selfGateway.requestJobStatus(this.jobId, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        MultipleJobsDetails multipleJobsDetails = (MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get();
        Assert.assertEquals(1L, multipleJobsDetails.getJobs().size());
        Assert.assertEquals(this.jobId, ((JobDetails) multipleJobsDetails.getJobs().iterator().next()).getJobId());
        jobManagerRunnerWithBlockingJobMasterFactory.unblockJobMasterInitialization();
        awaitStatus(selfGateway, this.jobId, JobStatus.RUNNING);
    }

    @Test
    public void testInvalidCallDuringInitialization() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new JobManagerRunnerWithBlockingJobMasterFactory());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(this.jobId, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        try {
            selfGateway.triggerSavepointAndGetLocation(this.jobId, "file:///tmp/savepoint", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT).get();
            Assert.fail("Previous statement should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof UnavailableDispatcherOperationException);
        }
    }

    @Test
    public void testCancellationDuringInitialization() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraphAndVertex = getBlockingJobGraphAndVertex();
        JobID jobID = ((JobGraph) blockingJobGraphAndVertex.f0).getJobID();
        selfGateway.submitJob((JobGraph) blockingJobGraphAndVertex.f0, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        CompletableFuture cancelJob = selfGateway.cancelJob(jobID, TIMEOUT);
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.CANCELLING));
        Assert.assertThat(Boolean.valueOf(cancelJob.isDone()), Is.is(false));
        ((BlockingJobVertex) blockingJobGraphAndVertex.f1).unblock();
        cancelJob.get();
        Assert.assertThat(((JobResult) selfGateway.requestJobResult(jobID, TIMEOUT).get()).getApplicationStatus(), Is.is(ApplicationStatus.CANCELED));
    }

    @Test
    public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(completableFuture, () -> {
        }));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = this.jobGraph.getJobID();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        completableFuture.complete(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(JobStatus.CANCELED).build())));
        this.dispatcher.getJobTerminationFuture(jobID, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.CANCELED));
        selfGateway.cancelJob(jobID, TIMEOUT).get();
    }

    @Test
    public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateException() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(completableFuture, () -> {
        }));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = this.jobGraph.getJobID();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        completableFuture.complete(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(JobStatus.FINISHED).build())));
        this.dispatcher.getJobTerminationFuture(jobID, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.FINISHED));
        Assert.assertThat(selfGateway.cancelJob(jobID, TIMEOUT), FlinkMatchers.futureWillCompleteExceptionally(FlinkJobTerminatedWithoutCancellationException.class, Duration.ofHours(8L)));
    }

    @Test
    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.dispatcher = createTestingDispatcherBuilder().setJobManagerRunnerFactory(new FinishingJobManagerRunnerFactory(completableFuture2, () -> {
        })).setHistoryServerArchivist(executionGraphInfo -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(null);
        }).build(rpcService);
        this.dispatcher.start();
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = this.jobGraph.getJobID();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        completableFuture2.complete(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(JobStatus.SUSPENDED).build())));
        selfGateway.requestJobResult(jobID, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.SUSPENDED));
        Assert.assertThat(Boolean.valueOf(completableFuture.isDone()), Is.is(false));
    }

    @Test
    public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory testingJobMasterServiceLeadershipRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, testingJobMasterServiceLeadershipRunnerFactory);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(this.jobId).build(), TIMEOUT).get();
        TestingJobManagerRunner takeCreatedJobManagerRunner = testingJobMasterServiceLeadershipRunnerFactory.takeCreatedJobManagerRunner();
        FlinkException flinkException = new FlinkException("Test failure");
        takeCreatedJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure(new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(this.jobId, this.jobGraph.getName(), JobStatus.FAILED, flinkException, this.jobGraph.getCheckpointingSettings(), 1L)), flinkException));
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) selfGateway.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assert.assertThat(archivedExecutionGraph.getState(), Is.is(JobStatus.FAILED));
        Assert.assertNotNull(archivedExecutionGraph.getFailureInfo());
        Assert.assertThat(archivedExecutionGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()).getMessage(), Matchers.equalTo(flinkException.getMessage()));
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = new JobID();
        JobStatus jobStatus = JobStatus.FAILED;
        ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(jobStatus).setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)).build());
        this.dispatcher.completeJobExecution(executionGraphInfo);
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Matchers.equalTo(jobStatus));
        Assert.assertThat(this.dispatcher.callAsyncInMainThread(() -> {
            return this.dispatcher.requestExecutionGraphInfo(jobID, TIMEOUT);
        }).get(), Is.is(executionGraphInfo));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        try {
            this.dispatcher.getSelfGateway(DispatcherGateway.class).requestJob(new JobID(), TIMEOUT).get();
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI createTestingSavepoint = createTestingSavepoint();
        Path path = Paths.get(createTestingSavepoint);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(true));
        selfGateway.disposeSavepoint(createTestingSavepoint.toString(), TIMEOUT).get();
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        CheckpointMetadataOutputStream createMetadataOutputStream = Checkpoints.loadCheckpointStorage(this.configuration, Thread.currentThread().getContextClassLoader(), this.log).createCheckpointStorage(this.jobGraph.getJobID()).initializeLocationForSavepoint(1L, this.temporaryFolder.newFolder().getAbsolutePath()).createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata(new CheckpointMetadata(1L, Collections.emptyList(), Collections.emptyList()), createMetadataOutputStream);
        return new URI(createMetadataOutputStream.closeAndFinalizeCheckpoint().getExternalPointer());
    }

    @Test
    public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
        testJobManagerRunnerFailureResultingInFatalError((testingJobManagerRunner, exc) -> {
            testingJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure(new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(this.jobId, this.jobGraph.getName(), JobStatus.FAILED, exc, this.jobGraph.getCheckpointingSettings(), 1L)), exc));
        });
    }

    @Test
    public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() throws Exception {
        testJobManagerRunnerFailureResultingInFatalError((v0, v1) -> {
            v0.completeResultFutureExceptionally(v1);
        });
    }

    private void testJobManagerRunnerFailureResultingInFatalError(BiConsumer<TestingJobManagerRunner, Exception> biConsumer) throws Exception {
        FlinkException flinkException = new FlinkException("Expected test exception");
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        TestingJobMasterServiceLeadershipRunnerFactory testingJobMasterServiceLeadershipRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = createTestingDispatcherBuilder().setJobManagerRunnerFactory(testingJobMasterServiceLeadershipRunnerFactory).setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph())).build(rpcService);
        this.dispatcher.start();
        TestingFatalErrorHandler fatalErrorHandler = this.testingFatalErrorHandlerResource.getFatalErrorHandler();
        biConsumer.accept(testingJobMasterServiceLeadershipRunnerFactory.takeCreatedJobManagerRunner(), flinkException);
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), flinkException.getMessage()).isPresent()), Is.is(true));
        fatalErrorHandler.clearError();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testThatDirtilyFinishedJobsNotBeingRetriggered() throws Exception {
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        this.dispatcher = createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(emptyJobGraph)).setRecoveredDirtyJobs(Collections.singleton(TestingJobResultStore.createSuccessfulJobResult(emptyJobGraph.getJobID()))).build(rpcService);
    }

    @Test
    public void testJobCleanupWithoutRecoveredJobGraph() throws Exception {
        JobID jobID = new JobID();
        TestingJobMasterServiceLeadershipRunnerFactory testingJobMasterServiceLeadershipRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        TestingCleanupRunnerFactory testingCleanupRunnerFactory = new TestingCleanupRunnerFactory();
        OneShotLatch oneShotLatch = new OneShotLatch();
        this.dispatcher = createTestingDispatcherBuilder().setJobManagerRunnerFactory(testingJobMasterServiceLeadershipRunnerFactory).setCleanupRunnerFactory(testingCleanupRunnerFactory).setRecoveredDirtyJobs(Collections.singleton(new JobResult.Builder().jobId(jobID).applicationStatus(ApplicationStatus.SUCCEEDED).netRuntime(1L).build())).setDispatcherBootstrapFactory((dispatcherGateway, scheduledExecutor, fatalErrorHandler) -> {
            oneShotLatch.trigger();
            return new NoOpDispatcherBootstrap();
        }).build(rpcService);
        this.dispatcher.start();
        oneShotLatch.await();
        Assert.assertThat("The CleanupJobManagerRunner has the wrong job ID attached.", testingCleanupRunnerFactory.takeCreatedJobManagerRunner().getJobID(), Is.is(jobID));
        Assert.assertThat("No JobMaster should have been started.", Integer.valueOf(testingJobMasterServiceLeadershipRunnerFactory.getQueueSize()), Is.is(0));
    }

    @Test
    public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        TestingJobGraphStore build = TestingJobGraphStore.newBuilder().build();
        build.start(null);
        this.haServices.setJobGraphStore(build);
        this.dispatcher = createTestingDispatcherBuilder().setJobGraphWriter(build).build(rpcService);
        this.dispatcher.start();
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(this.dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
        this.dispatcher.close();
        Assert.assertThat(Boolean.valueOf(build.contains(this.jobGraph.getJobID())), Matchers.is(true));
    }

    @Test
    public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture requestJobResult = selfGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat(Boolean.valueOf(requestJobResult.isDone()), Is.is(false));
        this.dispatcher.close();
        Assert.assertEquals(((JobResult) requestJobResult.get()).getApplicationStatus(), ApplicationStatus.UNKNOWN);
    }

    @Test
    public void testJobStatusIsShownDuringTermination() throws Exception {
        JobID jobID = new JobID();
        this.haServices.setJobMasterLeaderElectionService(jobID, new TestingLeaderElectionService());
        JobManagerRunnerWithBlockingTerminationFactory jobManagerRunnerWithBlockingTerminationFactory = new JobManagerRunnerWithBlockingTerminationFactory(jobID);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, jobManagerRunnerWithBlockingTerminationFactory);
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph singleNoOpJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        singleNoOpJobGraph.setJobID(jobID);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        selfGateway.submitJob(singleNoOpJobGraph, TIMEOUT).get();
        CompletableFuture closeAsync = this.dispatcher.closeAsync();
        try {
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
                return Boolean.valueOf(((ExecutionGraphInfo) selfGateway.requestExecutionGraphInfo(this.jobId, TIMEOUT).get()).getArchivedExecutionGraph().getState() == JobStatus.SUSPENDED);
            }, 5L);
            jobManagerRunnerWithBlockingTerminationFactory.unblockTermination();
            closeAsync.get();
        } catch (Throwable th) {
            jobManagerRunnerWithBlockingTerminationFactory.unblockTermination();
            closeAsync.get();
            throw th;
        }
    }

    @Test
    public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, JobMasterServiceLeadershipRunnerFactory.INSTANCE);
        this.dispatcher.getSelfGateway(DispatcherGateway.class).shutDownCluster().get();
        this.dispatcher.getShutDownFuture().get();
    }

    @Test
    public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingJobGraphStore build = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((jobID, executor) -> {
            completableFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).setLocalCleanupFunction((jobID2, executor2) -> {
            completableFuture2.complete(jobID2);
            return FutureUtils.completedVoidFuture();
        }).build();
        build.start(null);
        this.dispatcher = createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(this.jobGraph)).setJobGraphWriter(build).build(rpcService);
        this.dispatcher.start();
        this.dispatcher.onRemovedJobGraph(this.jobGraph.getJobID()).join();
        Assert.assertThat(completableFuture2.get(), Is.is(this.jobGraph.getJobID()));
        try {
            completableFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("onRemovedJobGraph should not remove the job from the JobGraphStore.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testInitializationTimestampForwardedToJobManagerRunner() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new InitializationTimestampCapturingJobManagerRunnerFactory(arrayBlockingQueue));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(Long.valueOf(((Long) arrayBlockingQueue.take()).longValue()), Matchers.greaterThan(0L));
    }

    @Test
    public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED)}));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        assertOnlyContainsSingleJobWithState(JobStatus.SUSPENDED, (MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING)}));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        selfGateway.requestJobResult(this.jobId, TIMEOUT).get();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        assertOnlyContainsSingleJobWithState(JobStatus.RUNNING, (MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED)}));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        selfGateway.requestJobResult(this.jobId, TIMEOUT).get();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        assertOnlyContainsSingleJobWithState(JobStatus.FINISHED, (MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_isSerializable() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED)}));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        InstantiationUtil.serializeObject((MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    private JobManagerRunner runningJobManagerRunnerWithJobStatus(JobStatus jobStatus) {
        Preconditions.checkArgument(!jobStatus.isTerminalState());
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setJobDetailsFunction(() -> {
            return JobDetails.createDetailsForJob(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(jobStatus).build());
        }).build();
    }

    private JobManagerRunner completedJobManagerRunnerWithJobStatus(JobStatus jobStatus) {
        Preconditions.checkArgument(jobStatus.isTerminalState());
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setResultFuture(CompletableFuture.completedFuture(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(jobStatus).build())))).build();
    }

    private static void assertOnlyContainsSingleJobWithState(JobStatus jobStatus, MultipleJobsDetails multipleJobsDetails) {
        Collection jobs = multipleJobsDetails.getJobs();
        Assert.assertEquals(1L, jobs.size());
        Assert.assertEquals(jobStatus, ((JobDetails) jobs.iterator().next()).getStatus());
    }

    @Test
    public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws Exception {
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        byte[] bArr = {1, 2, 3, 4};
        BlobServer blobServer = getBlobServer();
        PermanentBlobKey putPermanent = blobServer.putPermanent(jobID, bArr);
        PermanentBlobKey putPermanent2 = blobServer.putPermanent(jobID2, bArr);
        this.dispatcher = createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(new JobGraph(jobID, "foobar"))).build(rpcService);
        Assertions.assertThat(blobServer.getFile(jobID, putPermanent)).hasBinaryContent(bArr);
        Assertions.assertThatThrownBy(() -> {
            blobServer.getFile(jobID2, putPermanent2);
        }).isInstanceOf(NoSuchFileException.class);
    }

    @Test
    public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = new JobID();
        selfGateway.submitFailedJob(jobID, "test", new RuntimeException("Test exception.")).get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) selfGateway.requestJob(jobID, TIMEOUT).get();
        Assertions.assertThat(archivedExecutionGraph.getJobID()).isEqualTo(jobID);
        Assertions.assertThat(archivedExecutionGraph.getJobName()).isEqualTo("test");
        Assertions.assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
        Assertions.assertThat(archivedExecutionGraph.getFailureInfo()).isNotNull().extracting((v0) -> {
            return v0.getException();
        }).extracting(serializedThrowable -> {
            return serializedThrowable.deserializeError(Thread.currentThread().getContextClassLoader());
        }).satisfies(new ThrowingConsumer[]{th -> {
            Assertions.assertThat(th).isInstanceOf(RuntimeException.class).hasMessage("Test exception.");
        }});
    }

    private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
        BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
        blockingJobVertex.setInvokableClass(NoOpInvokable.class);
        blockingJobVertex.setParallelism(1);
        return Tuple2.of(JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(this.jobId).addJobVertex(blockingJobVertex).build(), blockingJobVertex);
    }
}
