package org.apache.flink.runtime.dispatcher.cleanup;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.dispatcher.cleanup.DefaultResourceCleaner;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.class */
public class DefaultResourceCleanerTest {
    private static final Duration TIMEOUT_FOR_RUNS_WITH_RETRY = Duration.ofHours(1);
    private static final Executor EXECUTOR = Executors.directExecutor();
    private static final JobID JOB_ID = new JobID();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest$CleanupCallback.class */
    public interface CleanupCallback extends BiFunction<JobID, Executor, CompletableFuture<Void>> {
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest$SingleCallCleanup.class */
    private static class SingleCallCleanup implements CleanupCallback {
        private final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
        private JobID jobId;
        private final Consumer<CompletableFuture<Void>> internalFunction;

        public static SingleCallCleanup withCompletionOnCleanup() {
            return new SingleCallCleanup(completableFuture -> {
                completableFuture.complete(null);
            });
        }

        public static SingleCallCleanup withoutCompletionOnCleanup() {
            return new SingleCallCleanup(completableFuture -> {
            });
        }

        private SingleCallCleanup(Consumer<CompletableFuture<Void>> consumer) {
            this.internalFunction = consumer;
        }

        @Override // java.util.function.BiFunction
        public CompletableFuture<Void> apply(JobID jobID, Executor executor) {
            Preconditions.checkState(this.jobId == null);
            this.jobId = jobID;
            this.internalFunction.accept(this.resultFuture);
            return this.resultFuture;
        }

        public boolean isDone() {
            return this.resultFuture.isDone();
        }

        public JobID getProcessedJobId() {
            return this.jobId;
        }

        public void completeCleanup() {
            this.resultFuture.complete(null);
        }

        public void completeCleanupExceptionally(Throwable th) {
            this.resultFuture.completeExceptionally(th);
        }
    }

    @Test
    public void testSuccessfulConcurrentCleanup() {
        SingleCallCleanup withoutCompletionOnCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup withoutCompletionOnCleanup2 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupAsync = createTestInstanceBuilder().withRegularCleanup("Reg #0", withoutCompletionOnCleanup).withRegularCleanup("Reg #1", withoutCompletionOnCleanup2).build().cleanupAsync(JOB_ID);
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        Assertions.assertThat(withoutCompletionOnCleanup).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        Assertions.assertThat(withoutCompletionOnCleanup2).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        withoutCompletionOnCleanup.completeCleanup();
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        withoutCompletionOnCleanup2.completeCleanup();
        Assertions.assertThat(cleanupAsync).isCompleted();
    }

    @Test
    public void testConcurrentCleanupWithExceptionFirst() {
        SingleCallCleanup withoutCompletionOnCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup withoutCompletionOnCleanup2 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupAsync = createTestInstanceBuilder().withRegularCleanup("Reg #0", withoutCompletionOnCleanup).withRegularCleanup("Reg #1", withoutCompletionOnCleanup2).build().cleanupAsync(JOB_ID);
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        Assertions.assertThat(withoutCompletionOnCleanup).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        Assertions.assertThat(withoutCompletionOnCleanup2).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        RuntimeException runtimeException = new RuntimeException("Expected exception");
        withoutCompletionOnCleanup.completeCleanupExceptionally(runtimeException);
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        withoutCompletionOnCleanup2.completeCleanup();
        Assertions.assertThat(cleanupAsync).failsWithin(Duration.ZERO).withThrowableOfType(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE).hasExactlyElementsOfTypes(new Class[]{ExecutionException.class, FutureUtils.RetryException.class, CompletionException.class, runtimeException.getClass()}).last().isEqualTo(runtimeException);
    }

    @Test
    public void testConcurrentCleanupWithExceptionSecond() {
        SingleCallCleanup withoutCompletionOnCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup withoutCompletionOnCleanup2 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupAsync = createTestInstanceBuilder().withRegularCleanup("Reg #0", withoutCompletionOnCleanup).withRegularCleanup("Reg #1", withoutCompletionOnCleanup2).build().cleanupAsync(JOB_ID);
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        Assertions.assertThat(withoutCompletionOnCleanup).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        Assertions.assertThat(withoutCompletionOnCleanup2).extracting((v0) -> {
            return v0.getProcessedJobId();
        }).isEqualTo(JOB_ID);
        withoutCompletionOnCleanup.completeCleanup();
        Assertions.assertThat(cleanupAsync).isNotCompleted();
        RuntimeException runtimeException = new RuntimeException("Expected exception");
        withoutCompletionOnCleanup2.completeCleanupExceptionally(runtimeException);
        Assertions.assertThat(cleanupAsync).failsWithin(Duration.ZERO).withThrowableOfType(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE).hasExactlyElementsOfTypes(new Class[]{ExecutionException.class, FutureUtils.RetryException.class, CompletionException.class, runtimeException.getClass()}).last().isEqualTo(runtimeException);
    }

    @Test
    public void testHighestPriorityCleanupBlocksAllOtherCleanups() {
        SingleCallCleanup withoutCompletionOnCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup2 = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup3 = SingleCallCleanup.withCompletionOnCleanup();
        CompletableFuture cleanupAsync = createTestInstanceBuilder().withPrioritizedCleanup("Prio #0", withoutCompletionOnCleanup).withPrioritizedCleanup("Prio #1", withCompletionOnCleanup).withRegularCleanup("Reg #0", withCompletionOnCleanup2).withRegularCleanup("Reg #1", withCompletionOnCleanup3).build().cleanupAsync(JOB_ID);
        Assertions.assertThat(withoutCompletionOnCleanup.isDone()).isFalse();
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isFalse();
        Assertions.assertThat(withCompletionOnCleanup2.isDone()).isFalse();
        Assertions.assertThat(withCompletionOnCleanup3.isDone()).isFalse();
        Assertions.assertThat(cleanupAsync.isDone()).isFalse();
        withoutCompletionOnCleanup.completeCleanup();
        Assertions.assertThat(cleanupAsync).succeedsWithin(Duration.ZERO);
        Assertions.assertThat(withoutCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(withCompletionOnCleanup2.isDone()).isTrue();
        Assertions.assertThat(withCompletionOnCleanup3.isDone()).isTrue();
    }

    @Test
    public void testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
        SingleCallCleanup withCompletionOnCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup withoutCompletionOnCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup2 = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup3 = SingleCallCleanup.withCompletionOnCleanup();
        DefaultResourceCleaner build = createTestInstanceBuilder().withPrioritizedCleanup("Prio #0", withCompletionOnCleanup).withPrioritizedCleanup("Prio #1", withoutCompletionOnCleanup).withRegularCleanup("Reg #0", withCompletionOnCleanup2).withRegularCleanup("Reg #1", withCompletionOnCleanup3).build();
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isFalse();
        CompletableFuture cleanupAsync = build.cleanupAsync(JOB_ID);
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(withoutCompletionOnCleanup.isDone()).isFalse();
        Assertions.assertThat(withCompletionOnCleanup2.isDone()).isFalse();
        Assertions.assertThat(withCompletionOnCleanup3.isDone()).isFalse();
        Assertions.assertThat(cleanupAsync.isDone()).isFalse();
        withoutCompletionOnCleanup.completeCleanup();
        Assertions.assertThat(cleanupAsync).succeedsWithin(Duration.ZERO);
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(withoutCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(withCompletionOnCleanup2.isDone()).isTrue();
        Assertions.assertThat(withCompletionOnCleanup3.isDone()).isTrue();
    }

    @Test
    public void testCleanupWithRetries() {
        ArrayList arrayList = new ArrayList();
        CleanupCallback cleanupWithInitialFailingRuns = cleanupWithInitialFailingRuns(arrayList, 2);
        SingleCallCleanup withCompletionOnCleanup = SingleCallCleanup.withCompletionOnCleanup();
        Assertions.assertThat(createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2)).withRegularCleanup("Reg #0", cleanupWithInitialFailingRuns).withRegularCleanup("Reg #1", withCompletionOnCleanup).build().cleanupAsync(JOB_ID)).succeedsWithin(TIMEOUT_FOR_RUNS_WITH_RETRY);
        Assertions.assertThat(withCompletionOnCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
        Assertions.assertThat(withCompletionOnCleanup.isDone()).isTrue();
        Assertions.assertThat(arrayList).containsExactly(new JobID[]{JOB_ID, JOB_ID, JOB_ID});
    }

    @Test
    public void testCleanupWithSingleRetryInHighPriorityTask() {
        ArrayList arrayList = new ArrayList();
        CleanupCallback cleanupWithInitialFailingRuns = cleanupWithInitialFailingRuns(arrayList, 1);
        SingleCallCleanup withCompletionOnCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup withCompletionOnCleanup2 = SingleCallCleanup.withCompletionOnCleanup();
        Assertions.assertThat(createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1)).withPrioritizedCleanup("Prio #0", cleanupWithInitialFailingRuns).withPrioritizedCleanup("Prio #1", withCompletionOnCleanup).withRegularCleanup("Reg #0", withCompletionOnCleanup2).build().cleanupAsync(JOB_ID)).succeedsWithin(TIMEOUT_FOR_RUNS_WITH_RETRY);
        Assertions.assertThat(withCompletionOnCleanup2.getProcessedJobId()).isEqualTo(JOB_ID);
        Assertions.assertThat(withCompletionOnCleanup2.isDone()).isTrue();
        Assertions.assertThat(arrayList).containsExactly(new JobID[]{JOB_ID, JOB_ID});
    }

    private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder() {
        return createTestInstanceBuilder(TestingRetryStrategies.NO_RETRY_STRATEGY);
    }

    private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder(RetryStrategy retryStrategy) {
        return DefaultResourceCleaner.forCleanableResources(ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR, (v0, v1, v2) -> {
            return v0.apply(v1, v2);
        }, retryStrategy);
    }

    private static CleanupCallback cleanupWithInitialFailingRuns(Collection<JobID> collection, int i) {
        AtomicInteger atomicInteger = new AtomicInteger(i);
        return (jobID, executor) -> {
            collection.add(jobID);
            return atomicInteger.getAndDecrement() > 0 ? FutureUtils.completedExceptionally(new RuntimeException("Expected RuntimeException")) : FutureUtils.completedVoidFuture();
        };
    }
}
