package org.apache.flink.runtime.dispatcher.cleanup;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.class */
public class CheckpointResourcesCleanupRunnerTest {
    private static final Time TIMEOUT_FOR_REQUESTS = Time.milliseconds(0);
    private static final Duration TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY = Duration.ofMinutes(60);
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> BEFORE_START = checkpointResourcesCleanupRunner -> {
    };
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> AFTER_START = (v0) -> {
        v0.start();
    };
    private static final ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> AFTER_CLOSE = checkpointResourcesCleanupRunner -> {
        checkpointResourcesCleanupRunner.start();
        checkpointResourcesCleanupRunner.close();
    };

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest$HaltingCheckpointRecoveryFactory.class */
    private static class HaltingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
        private final CompletedCheckpointStore completedCheckpointStore;
        private final CheckpointIDCounter checkpointIDCounter;
        private final OneShotLatch creationLatch;

        public HaltingCheckpointRecoveryFactory(CompletableFuture<JobStatus> completableFuture, CompletableFuture<JobStatus> completableFuture2) {
            this(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture2));
        }

        public HaltingCheckpointRecoveryFactory(CompletedCheckpointStore completedCheckpointStore, CheckpointIDCounter checkpointIDCounter) {
            this.creationLatch = new OneShotLatch();
            this.completedCheckpointStore = (CompletedCheckpointStore) Preconditions.checkNotNull(completedCheckpointStore);
            this.checkpointIDCounter = (CheckpointIDCounter) Preconditions.checkNotNull(checkpointIDCounter);
        }

        public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(JobID jobID, int i, SharedStateRegistryFactory sharedStateRegistryFactory, Executor executor, RestoreMode restoreMode) throws Exception {
            this.creationLatch.await();
            return this.completedCheckpointStore;
        }

        public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
            this.creationLatch.await();
            return this.checkpointIDCounter;
        }

        public void triggerCreation() {
            this.creationLatch.trigger();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest$TestInstanceBuilder.class */
    public static class TestInstanceBuilder {
        private JobResult jobResult;
        private CheckpointRecoveryFactory checkpointRecoveryFactory;
        private SharedStateRegistryFactory sharedStateRegistryFactory;
        private Executor executor;
        private Configuration configuration;
        private long initializationTimestamp;

        private TestInstanceBuilder() {
            this.jobResult = CheckpointResourcesCleanupRunnerTest.access$100();
            this.checkpointRecoveryFactory = CheckpointResourcesCleanupRunnerTest.access$200();
            this.sharedStateRegistryFactory = SharedStateRegistry.DEFAULT_FACTORY;
            this.executor = Executors.directExecutor();
            this.configuration = new Configuration();
            this.initializationTimestamp = System.currentTimeMillis();
        }

        public TestInstanceBuilder withJobResult(JobResult jobResult) {
            this.jobResult = jobResult;
            return this;
        }

        public TestInstanceBuilder withCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
            return this;
        }

        public TestInstanceBuilder withSharedStateRegistryFactory(SharedStateRegistryFactory sharedStateRegistryFactory) {
            this.sharedStateRegistryFactory = sharedStateRegistryFactory;
            return this;
        }

        public TestInstanceBuilder withExecutor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public TestInstanceBuilder withConfiguration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public TestInstanceBuilder withInitializationTimestamp(long j) {
            this.initializationTimestamp = j;
            return this;
        }

        public CheckpointResourcesCleanupRunner build() {
            return new CheckpointResourcesCleanupRunner(this.jobResult, this.checkpointRecoveryFactory, this.sharedStateRegistryFactory, this.configuration, this.executor, this.initializationTimestamp);
        }
    }

    @Test
    public void testIsInitializedBeforeStart() throws Exception {
        testIsInitialized(BEFORE_START);
    }

    @Test
    public void testIsInitializedAfterStart() throws Exception {
        testIsInitialized(AFTER_START);
    }

    @Test
    public void testIsInitializedAfterClose() throws Exception {
        testIsInitialized(AFTER_CLOSE);
    }

    private static void testIsInitialized(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> throwingConsumer) throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().build();
        throwingConsumer.accept(build);
        Assertions.assertThat(build.isInitialized()).isTrue();
    }

    @Test
    public void testCloseAsyncBeforeStart() {
        Assertions.assertThat(new TestInstanceBuilder().build().closeAsync()).isNotCompleted();
    }

    @Test
    public void testSuccessfulCloseAsyncAfterStart() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        HaltingCheckpointRecoveryFactory haltingCheckpointRecoveryFactory = new HaltingCheckpointRecoveryFactory((CompletableFuture<JobStatus>) completableFuture, (CompletableFuture<JobStatus>) completableFuture2);
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withCheckpointRecoveryFactory(haltingCheckpointRecoveryFactory).withExecutor(ForkJoinPool.commonPool()).build();
        build.start();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CompletedCheckpointStore shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture2).as("The CheckpointIDCounter shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        haltingCheckpointRecoveryFactory.triggerCreation();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CompletedCheckpointStore should have been shut down properly.", new Object[0])).succeedsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).isEqualTo(JobStatus.FINISHED);
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture2).as("The CheckpointIDCounter should have been shut down properly.", new Object[0])).succeedsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).isEqualTo(JobStatus.FINISHED);
        Assertions.assertThat(build.closeAsync()).succeedsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY);
    }

    @Test
    public void testCloseAsyncAfterStartAndErrorInCompletedCheckpointStoreShutdown() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        HaltingCheckpointRecoveryFactory haltingCheckpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.builder().withShutdownConsumer((jobStatus, checkpointsCleaner) -> {
            throw new RuntimeException("Expected RuntimeException simulating an error during shutdown.");
        }).build(), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture));
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withCheckpointRecoveryFactory(haltingCheckpointRecoveryFactory).withExecutor(ForkJoinPool.commonPool()).build();
        build.start();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CheckpointIDCounter shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        haltingCheckpointRecoveryFactory.triggerCreation();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CheckpointIDCounter should have been shut down properly.", new Object[0])).succeedsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).isEqualTo(JobStatus.FINISHED);
        Assertions.assertThat(build.closeAsync()).failsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RuntimeException.class);
    }

    @Test
    public void testCloseAsyncAfterStartAndErrorInCheckpointIDCounterShutdown() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        HaltingCheckpointRecoveryFactory haltingCheckpointRecoveryFactory = new HaltingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture), TestingCheckpointIDCounter.builder().withShutdownConsumer(jobStatus -> {
            throw new RuntimeException("Expected RuntimeException simulating an error during shutdown.");
        }).build());
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withCheckpointRecoveryFactory(haltingCheckpointRecoveryFactory).withExecutor(ForkJoinPool.commonPool()).build();
        build.start();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CompletedCheckpointStore shouldn't have been shut down, yet.", new Object[0])).isNotCompleted();
        ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("closeAsync shouldn't have been completed, yet, since the shutdown of the components is not completed.", new Object[0])).isNotCompleted();
        haltingCheckpointRecoveryFactory.triggerCreation();
        ((CompletableFutureAssert) Assertions.assertThat(completableFuture).as("The CompletedCheckpointStore should have been shut down properly.", new Object[0])).succeedsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).isEqualTo(JobStatus.FINISHED);
        Assertions.assertThat(build.closeAsync()).failsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RuntimeException.class);
    }

    @Test
    public void testCancellationBeforeStart() throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build();
        Assertions.assertThat(build.cancel(TIMEOUT_FOR_REQUESTS)).failsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert) ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("The closeAsync result shouldn't be completed, yet.", new Object[0])).isNotCompleted().as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    public void testCancellationAfterStart() throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withCheckpointRecoveryFactory(new HaltingCheckpointRecoveryFactory((CompletableFuture<JobStatus>) new CompletableFuture(), (CompletableFuture<JobStatus>) new CompletableFuture())).withExecutor(ForkJoinPool.commonPool()).build();
        AFTER_START.accept(build);
        Assertions.assertThat(build.cancel(TIMEOUT_FOR_REQUESTS)).failsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert) ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("The closeAsync result shouldn't be completed, yet.", new Object[0])).isNotCompleted().as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    public void testCancellationAfterClose() throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withExecutor(ForkJoinPool.commonPool()).build();
        AFTER_CLOSE.accept(build);
        Assertions.assertThat(build.cancel(TIMEOUT_FOR_REQUESTS)).failsWithin(TIMEOUT_FOR_RESULTS_WITH_CONCURRENCY).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
        ((CompletableFutureAssert) ((CompletableFutureAssert) Assertions.assertThat(build.closeAsync()).as("The closeAsync result should be completed by now.", new Object[0])).isCompleted().as("The closeAsync result shouldn't be cancelled.", new Object[0])).isNotCancelled();
    }

    @Test
    public void testResultFutureWithSuccessBeforeStart() throws Exception {
        Assertions.assertThat(getResultFutureFromTestInstance(createDummySuccessJobResult(), BEFORE_START)).isNotCompleted();
    }

    @Test
    public void testResultFutureWithSuccessAfterStart() throws Exception {
        testResultFutureWithSuccessfulResultAfterStart(AFTER_START);
    }

    @Test
    public void testResultFutureWithSuccessAfterClose() throws Exception {
        testResultFutureWithSuccessfulResultAfterStart(AFTER_CLOSE);
    }

    private void testResultFutureWithSuccessfulResultAfterStart(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> throwingConsumer) throws Exception {
        Assertions.assertThat(getResultFutureFromTestInstance(createDummySuccessJobResult(), throwingConsumer)).isCompletedWithValueMatching((v0) -> {
            return v0.isSuccess();
        }, "The JobManagerRunner should have succeeded.");
    }

    @Test
    public void testResultFutureWithErrorBeforeStart() throws Exception {
        Assertions.assertThat(getResultFutureFromTestInstance(createJobResultWithFailure(new SerializedThrowable(new Exception("Expected exception"))), BEFORE_START)).isNotCompleted();
    }

    @Test
    public void testResultFutureWithErrorAfterStart() throws Exception {
        testResultFutureWithErrorAfterStart(AFTER_START);
    }

    @Test
    public void testResultFutureWithErrorAfterClose() throws Exception {
        testResultFutureWithErrorAfterStart(AFTER_CLOSE);
    }

    private static void testResultFutureWithErrorAfterStart(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> throwingConsumer) throws Exception {
        SerializedThrowable serializedThrowable = new SerializedThrowable(new Exception("Expected exception"));
        Assertions.assertThat(getResultFutureFromTestInstance(createJobResultWithFailure(serializedThrowable), throwingConsumer)).isCompletedWithValueMatching(jobManagerRunnerResult -> {
            return ((ErrorInfo) Objects.requireNonNull(jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo())).getException().equals(serializedThrowable);
        }, "JobManagerRunner should have failed with expected error");
    }

    private static CompletableFuture<JobManagerRunnerResult> getResultFutureFromTestInstance(JobResult jobResult, ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> throwingConsumer) throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().withJobResult(jobResult).build();
        throwingConsumer.accept(build);
        return build.getResultFuture();
    }

    @Test
    public void testGetJobID() {
        JobID jobID = new JobID();
        Assertions.assertThat(new TestInstanceBuilder().withJobResult(createJobResult(jobID, ApplicationStatus.CANCELED)).build().getJobID()).isEqualTo(jobID);
    }

    @Test
    public void testGetJobMasterGatewayBeforeStart() throws Exception {
        testGetJobMasterGateway(BEFORE_START);
    }

    @Test
    public void testGetJobMasterGatewayAfterStart() throws Exception {
        testGetJobMasterGateway(AFTER_START);
    }

    @Test
    public void testGetJobMasterGatewayAfterClose() throws Exception {
        testGetJobMasterGateway(AFTER_CLOSE);
    }

    private static void testGetJobMasterGateway(ThrowingConsumer<CheckpointResourcesCleanupRunner, ? extends Exception> throwingConsumer) throws Exception {
        CheckpointResourcesCleanupRunner build = new TestInstanceBuilder().build();
        throwingConsumer.accept(build);
        Assertions.assertThatThrownBy(() -> {
        }).isInstanceOf(ExecutionException.class).hasCauseExactlyInstanceOf(UnavailableDispatcherOperationException.class);
    }

    @Test
    public void testRequestJob_ExceptionHistory() {
        testRequestJob(createDummySuccessJobResult(), System.currentTimeMillis(), executionGraphInfo -> {
            return Boolean.valueOf(!executionGraphInfo.getExceptionHistory().iterator().hasNext());
        });
    }

    @Test
    public void testRequestJob_JobName() {
        testRequestJobExecutionGraph(createDummySuccessJobResult(), System.currentTimeMillis(), accessExecutionGraph -> {
            return Boolean.valueOf(accessExecutionGraph.getJobName().equals("unknown"));
        });
    }

    @Test
    public void testRequestJob_JobId() {
        JobResult createDummySuccessJobResult = createDummySuccessJobResult();
        testRequestJobExecutionGraph(createDummySuccessJobResult, System.currentTimeMillis(), accessExecutionGraph -> {
            return Boolean.valueOf(accessExecutionGraph.getJobID().equals(createDummySuccessJobResult.getJobId()));
        });
    }

    @Test
    public void testRequestJob_JobState() {
        JobResult createDummySuccessJobResult = createDummySuccessJobResult();
        testRequestJobExecutionGraph(createDummySuccessJobResult, System.currentTimeMillis(), accessExecutionGraph -> {
            return Boolean.valueOf(accessExecutionGraph.getState().equals(createDummySuccessJobResult.getApplicationStatus().deriveJobStatus()));
        });
    }

    @Test
    public void testRequestJob_InitiatizationTimestamp() {
        long currentTimeMillis = System.currentTimeMillis();
        testRequestJobExecutionGraph(createDummySuccessJobResult(), currentTimeMillis, accessExecutionGraph -> {
            return Boolean.valueOf(accessExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) == currentTimeMillis);
        });
    }

    @Test
    public void testRequestJobWithFailure() {
        SerializedThrowable serializedThrowable = new SerializedThrowable(new Exception("Expected exception"));
        testRequestJobExecutionGraph(createJobResultWithFailure(serializedThrowable), System.currentTimeMillis(), accessExecutionGraph -> {
            return Boolean.valueOf(((ErrorInfo) Objects.requireNonNull(accessExecutionGraph.getFailureInfo())).getException().equals(serializedThrowable));
        });
    }

    private static void testRequestJobExecutionGraph(JobResult jobResult, long j, Function<AccessExecutionGraph, Boolean> function) {
        testRequestJob(jobResult, j, executionGraphInfo -> {
            return (Boolean) function.apply(executionGraphInfo.getArchivedExecutionGraph());
        });
    }

    private static void testRequestJob(JobResult jobResult, long j, Function<ExecutionGraphInfo, Boolean> function) {
        CompletableFutureAssert assertThat = Assertions.assertThat(new TestInstanceBuilder().withJobResult(jobResult).withInitializationTimestamp(j).build().requestJob(TIMEOUT_FOR_REQUESTS));
        function.getClass();
        assertThat.isCompletedWithValueMatching((v1) -> {
            return r1.apply(v1);
        });
    }

    private static JobResult createDummySuccessJobResult() {
        return createJobResult(new JobID(), ApplicationStatus.SUCCEEDED);
    }

    private static JobResult createJobResultWithFailure(SerializedThrowable serializedThrowable) {
        return new JobResult.Builder().jobId(new JobID()).applicationStatus(ApplicationStatus.FAILED).serializedThrowable(serializedThrowable).netRuntime(1L).build();
    }

    private static JobResult createJobResult(JobID jobID, ApplicationStatus applicationStatus) {
        return new JobResult.Builder().jobId(jobID).applicationStatus(applicationStatus).netRuntime(1L).build();
    }

    private static CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return new TestingCheckpointRecoveryFactory(TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(new CompletableFuture()), TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(new CompletableFuture()));
    }

    static /* synthetic */ JobResult access$100() {
        return createDummySuccessJobResult();
    }

    static /* synthetic */ CheckpointRecoveryFactory access$200() {
        return createCheckpointRecoveryFactory();
    }
}
