package org.apache.flink.runtime.dispatcher;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CleanupOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.class */
public class DispatcherCleanupITCase extends AbstractDispatcherTest {
    private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue();

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.haServices.setCheckpointRecoveryFactory(new PerJobCheckpointRecoveryFactory((i, embeddedCompletedCheckpointStore, sharedStateRegistryFactory, executor, restoreMode) -> {
            if (embeddedCompletedCheckpointStore == null) {
                return new EmbeddedCompletedCheckpointStore(i, Collections.emptyList(), sharedStateRegistryFactory.create(executor, Collections.emptyList(), RestoreMode.DEFAULT));
            }
            Assert.assertTrue(embeddedCompletedCheckpointStore.getShutdownStatus().isPresent());
            Assert.assertTrue(embeddedCompletedCheckpointStore.getAllCheckpoints().isEmpty());
            return new EmbeddedCompletedCheckpointStore(i, embeddedCompletedCheckpointStore.getAllCheckpoints(), sharedStateRegistryFactory.create(executor, embeddedCompletedCheckpointStore.getAllCheckpoints(), restoreMode));
        }));
    }

    @Override // org.apache.flink.runtime.dispatcher.AbstractDispatcherTest
    @After
    public void tearDown() {
        while (!this.toTerminate.isEmpty()) {
            try {
                RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{this.toTerminate.poll()});
            } catch (Exception e) {
            }
        }
    }

    @Test
    public void testCleanupThroughRetries() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        JobID jobID = createJobGraph.getJobID();
        AtomicInteger atomicInteger = new AtomicInteger();
        OneShotLatch oneShotLatch = new OneShotLatch();
        RuntimeException runtimeException = new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
        AtomicInteger atomicInteger2 = new AtomicInteger(5);
        TestingJobGraphStore build = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((jobID2, executor) -> {
            atomicInteger.incrementAndGet();
            if (atomicInteger2.getAndDecrement() > 0) {
                return FutureUtils.completedExceptionally(runtimeException);
            }
            oneShotLatch.trigger();
            return FutureUtils.completedVoidFuture();
        }).build();
        build.start(NoOpJobGraphListener.INSTANCE);
        this.haServices.setJobGraphStore(build);
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(jobID, testingLeaderElectionService);
        RpcEndpoint build2 = createTestingDispatcherBuilder().setResourceCleanerFactory(new DispatcherResourceCleanerFactory(ForkJoinPool.commonPool(), TestingRetryStrategies.createWithNumberOfRetries(5), new DefaultJobManagerRunnerRegistry(2), this.haServices.getJobGraphStore(), this.blobServer, this.haServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup())).build(rpcService);
        build2.start();
        this.toTerminate.add(build2);
        testingLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway) build2.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(createJobGraph, TIMEOUT).get();
        waitForJobToFinish(testingLeaderElectionService, dispatcherGateway, jobID);
        oneShotLatch.await();
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(6));
        MatcherAssert.assertThat("The JobGraph should be removed from JobGraphStore.", this.haServices.getJobGraphStore().getJobIds(), IsEmptyCollection.empty());
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(this.haServices.getJobResultStore().hasJobResultEntry(jobID));
        });
    }

    @Test
    public void testCleanupNotCancellable() throws Exception {
        JobID jobID = createJobGraph().getJobID();
        JobResultStore embeddedJobResultStore = new EmbeddedJobResultStore();
        embeddedJobResultStore.createDirtyResult(new JobResultEntry(TestingJobResultStore.createSuccessfulJobResult(jobID)));
        this.haServices.setJobResultStore(embeddedJobResultStore);
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        RpcEndpoint build = createTestingDispatcherBuilder().setJobManagerRunnerRegistry(TestingJobManagerRunnerRegistry.newSingleJobBuilder(atomicReference).withLocalCleanupAsyncFunction((jobID2, executor) -> {
            return completableFuture;
        }).build()).build(rpcService);
        build.start();
        this.toTerminate.add(build);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        MatcherAssert.assertThat("The JobResultStore should have this job still marked as dirty.", Boolean.valueOf(this.haServices.getJobResultStore().hasDirtyJobResultEntry(jobID)), CoreMatchers.is(true));
        try {
            build.getSelfGateway(DispatcherGateway.class).cancelJob(jobID, TIMEOUT).get();
            Assert.fail("Should fail because cancelling the cleanup is not allowed.");
        } catch (ExecutionException e) {
            MatcherAssert.assertThat(e, FlinkMatchers.containsCause(JobCancellationFailedException.class));
        }
        completableFuture.complete(null);
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(this.haServices.getJobResultStore().hasCleanJobResultEntry(jobID));
        });
    }

    @Test
    public void testCleanupAfterLeadershipChange() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        JobID jobID = createJobGraph.getJobID();
        AtomicInteger atomicInteger = new AtomicInteger();
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingJobGraphStore build = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((jobID2, executor) -> {
            int andIncrement = atomicInteger.getAndIncrement();
            oneShotLatch.trigger();
            if (andIncrement < 1) {
                return FutureUtils.completedExceptionally(new RuntimeException("Expected RuntimeException: Unable to remove job graph."));
            }
            completableFuture.complete(jobID2);
            return FutureUtils.completedVoidFuture();
        }).build();
        build.start(NoOpJobGraphListener.INSTANCE);
        this.haServices.setJobGraphStore(build);
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(jobID, testingLeaderElectionService);
        this.configuration.set(CleanupOptions.CLEANUP_STRATEGY, CleanupOptions.NONE_PARAM_VALUES.iterator().next());
        RpcEndpoint build2 = createTestingDispatcherBuilder().build(rpcService);
        build2.start();
        this.toTerminate.add(build2);
        testingLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway) build2.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(createJobGraph, TIMEOUT).get();
        waitForJobToFinish(testingLeaderElectionService, dispatcherGateway, jobID);
        oneShotLatch.await();
        testingLeaderElectionService.notLeader();
        testingLeaderElectionService.stop();
        MatcherAssert.assertThat("The cleanup should have been triggered only once.", Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(1));
        MatcherAssert.assertThat("The cleanup should not have reached the successful cleanup code path.", Boolean.valueOf(completableFuture.isDone()), IsEqual.equalTo(false));
        MatcherAssert.assertThat("The JobGraph is still stored in the JobGraphStore.", this.haServices.getJobGraphStore().getJobIds(), IsEqual.equalTo(Collections.singleton(jobID)));
        MatcherAssert.assertThat("The JobResultStore should have this job marked as dirty.", this.haServices.getJobResultStore().getDirtyResults().stream().map((v0) -> {
            return v0.getJobId();
        }).collect(Collectors.toSet()), IsEqual.equalTo(Collections.singleton(jobID)));
        RpcEndpoint build3 = createTestingDispatcherBuilder().setRecoveredDirtyJobs(this.haServices.getJobResultStore().getDirtyResults()).build(rpcService);
        build3.start();
        this.toTerminate.add(build3);
        testingLeaderElectionService.isLeader(UUID.randomUUID());
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(this.haServices.getJobResultStore().getDirtyResults().isEmpty());
        });
        MatcherAssert.assertThat("The JobGraph is not stored in the JobGraphStore.", this.haServices.getJobGraphStore().getJobIds(), IsEmptyCollection.empty());
        Assert.assertTrue("The JobResultStore has the job listed as clean.", this.haServices.getJobResultStore().hasJobResultEntry(jobID));
        MatcherAssert.assertThat(completableFuture.get(), IsEqual.equalTo(jobID));
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(2));
    }

    private void waitForJobToFinish(TestingLeaderElectionService testingLeaderElectionService, DispatcherGateway dispatcherGateway, JobID jobID) throws Exception {
        JobMasterTester jobMasterTester = new JobMasterTester(rpcService, jobID, connectToLeadingJobMaster(testingLeaderElectionService).get());
        Throwable th = null;
        try {
            try {
                CompletableFuture<List<TaskDeploymentDescriptor>> deployVertices = jobMasterTester.deployVertices(2);
                awaitStatus(dispatcherGateway, jobID, JobStatus.RUNNING);
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.INITIALIZING).get();
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.RUNNING).get();
                jobMasterTester.getCheckpointFuture(1L).get();
                jobMasterTester.transitionTo(deployVertices.get(), ExecutionState.FINISHED).get();
                if (jobMasterTester != null) {
                    if (0 != 0) {
                        try {
                            jobMasterTester.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        jobMasterTester.close();
                    }
                }
                awaitStatus(dispatcherGateway, jobID, JobStatus.FINISHED);
            } finally {
            }
        } catch (Throwable th3) {
            if (jobMasterTester != null) {
                if (th != null) {
                    try {
                        jobMasterTester.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    jobMasterTester.close();
                }
            }
            throw th3;
        }
    }

    private JobGraph createJobGraph() {
        JobVertex jobVertex = new JobVertex("first");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        JobVertex jobVertex2 = new JobVertex("second");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).addJobVertex(jobVertex2).setJobCheckpointingSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(20L).setMinPauseBetweenCheckpoints(20L).setCheckpointTimeout(10000L).build(), (SerializedValue) null)).build();
    }

    private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(TestingLeaderElectionService testingLeaderElectionService) {
        return testingLeaderElectionService.getConfirmationFuture().thenCompose(leaderConnectionInfo -> {
            return rpcService.connect(leaderConnectionInfo.getAddress(), JobMasterId.fromUuidOrNull(leaderConnectionInfo.getLeaderSessionId()), JobMasterGateway.class);
        });
    }
}
