package org.apache.flink.runtime.dispatcher.runner;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.class */
class SessionDispatcherLeaderProcessTest {
    private static final JobGraph JOB_GRAPH = JobGraphTestUtils.emptyJobGraph();
    private static ExecutorService ioExecutor;
    private final UUID leaderSessionId = UUID.randomUUID();
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private JobResultStore jobResultStore;
    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherServiceFactory;

    SessionDispatcherLeaderProcessTest() {
    }

    @BeforeAll
    static void setupClass() {
        ioExecutor = Executors.newSingleThreadExecutor();
    }

    @BeforeEach
    void setup() {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.jobResultStore = TestingJobResultStore.builder().build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().build();
        });
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
            this.fatalErrorHandler = null;
        }
    }

    @AfterAll
    static void teardownClass() {
        if (ioExecutor != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{ioExecutor});
        }
    }

    @Test
    void start_afterClose_doesNotHaveAnEffect() throws Exception {
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        createDispatcherLeaderProcess.close();
        createDispatcherLeaderProcess.start();
        Assertions.assertThat(createDispatcherLeaderProcess.getState()).isEqualTo(AbstractDispatcherLeaderProcess.State.STOPPED);
    }

    @Test
    void testStartTriggeringDispatcherServiceCreation() throws Exception {
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            createDispatcherLeaderProcess.start();
            Assertions.assertThat(createDispatcherLeaderProcess.getState()).isEqualTo(AbstractDispatcherLeaderProcess.State.RUNNING);
            if (createDispatcherLeaderProcess != null) {
                if (0 == 0) {
                    createDispatcherLeaderProcess.close();
                    return;
                }
                try {
                    createDispatcherLeaderProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDispatcherLeaderProcess != null) {
                if (0 != 0) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRecoveryWithJobGraphButNoDirtyJobResult() throws Exception {
        testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.emptySet(), collection -> {
            Assertions.assertThat(collection).singleElement().isEqualTo(JOB_GRAPH);
        }, collection2 -> {
            Assertions.assertThat(collection2).isEmpty();
        });
    }

    @Test
    void testRecoveryWithJobGraphAndMatchingDirtyJobResult() throws Exception {
        JobResult createSuccessfulJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        testJobRecovery(Collections.singleton(JOB_GRAPH), Collections.singleton(createSuccessfulJobResult), collection -> {
            Assertions.assertThat(collection).isEmpty();
        }, collection2 -> {
            Assertions.assertThat(collection2).singleElement().isEqualTo(createSuccessfulJobResult);
        });
    }

    @Test
    void testRecoveryWithMultipleJobGraphsAndOneMatchingDirtyJobResult() throws Exception {
        JobResult createSuccessfulJobResult = TestingJobResultStore.createSuccessfulJobResult(JOB_GRAPH.getJobID());
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        testJobRecovery(Arrays.asList(emptyJobGraph, JOB_GRAPH), Collections.singleton(createSuccessfulJobResult), collection -> {
            Assertions.assertThat(collection).singleElement().isEqualTo(emptyJobGraph);
        }, collection2 -> {
            Assertions.assertThat(collection2).singleElement().isEqualTo(createSuccessfulJobResult);
        });
    }

    @Test
    void testRecoveryWithoutJobGraphButDirtyJobResult() throws Exception {
        JobResult createSuccessfulJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        testJobRecovery(Collections.emptyList(), Collections.singleton(createSuccessfulJobResult), collection -> {
            Assertions.assertThat(collection).isEmpty();
        }, collection2 -> {
            Assertions.assertThat(collection2).singleElement().isEqualTo(createSuccessfulJobResult);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void testJobRecovery(Collection<JobGraph> collection, Set<JobResult> set, Consumer<Collection<JobGraph>> consumer, Consumer<Collection<JobResult>> consumer2) throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(collection).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier(() -> {
            return set;
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.dispatcherServiceFactory = (dispatcherId, collection2, collection3, jobGraphWriter, jobResultStore) -> {
            completableFuture.complete(collection2);
            completableFuture2.complete(collection3);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                consumer.accept(completableFuture.get());
                consumer2.accept(completableFuture2.get());
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testRecoveryWhileJobGraphRecoveryIsScheduledConcurrently() throws Exception {
        JobResult createSuccessfulJobResult = TestingJobResultStore.createSuccessfulJobResult(new JobID());
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            return null;
        }).build();
        this.jobResultStore = TestingJobResultStore.builder().withGetDirtyResultsSupplier(() -> {
            oneShotLatch.trigger();
            try {
                oneShotLatch2.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return Collections.singleton(createSuccessfulJobResult);
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.dispatcherServiceFactory = (dispatcherId, collection, collection2, jobGraphWriter, jobResultStore) -> {
            completableFuture.complete(collection);
            completableFuture2.complete(collection2);
            return TestingDispatcherGatewayService.newBuilder().build();
        };
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            createDispatcherLeaderProcess.start();
            oneShotLatch.await();
            createDispatcherLeaderProcess.onAddedJobGraph(createSuccessfulJobResult.getJobId());
            oneShotLatch2.trigger();
            FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds().satisfies(new ThrowingConsumer[]{collection3 -> {
                Assertions.assertThat(collection3).isEmpty();
            }});
            FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds().satisfies(new ThrowingConsumer[]{collection4 -> {
                Assertions.assertThat(collection4).containsExactly(new JobResult[]{createSuccessfulJobResult});
            }});
            if (createDispatcherLeaderProcess != null) {
                if (0 == 0) {
                    createDispatcherLeaderProcess.close();
                    return;
                }
                try {
                    createDispatcherLeaderProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDispatcherLeaderProcess != null) {
                if (0 != 0) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setStopRunnable(() -> {
            completableFuture.complete(null);
        }).build();
        CompletableFuture completableFuture2 = new CompletableFuture();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture2).withManualTerminationFutureCompletion().build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                CompletableFuture closeAsync = createDispatcherLeaderProcess.closeAsync();
                Assertions.assertThat(completableFuture).isNotDone();
                Assertions.assertThat(closeAsync).isNotDone();
                completableFuture2.complete(null);
                completableFuture.get();
                closeAsync.get();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void unexpectedDispatcherServiceTerminationWhileRunning_callsFatalErrorHandler() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture).build();
        });
        createDispatcherLeaderProcess().start();
        FlinkException flinkException = new FlinkException("Expected test failure.");
        completableFuture.completeExceptionally(flinkException);
        Assertions.assertThat(this.fatalErrorHandler.getErrorFuture().join()).rootCause().isEqualTo(flinkException);
        this.fatalErrorHandler.clearError();
    }

    @Test
    void unexpectedDispatcherServiceTerminationWhileNotRunning_doesNotCallFatalErrorHandler() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().setTerminationFuture(completableFuture).withManualTerminationFutureCompletion().build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        createDispatcherLeaderProcess.start();
        createDispatcherLeaderProcess.closeAsync();
        completableFuture.completeExceptionally(new FlinkException("Expected test failure."));
        Assertions.assertThatThrownBy(() -> {
            this.fatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS);
        }).isInstanceOf(TimeoutException.class);
    }

    @Test
    void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenStarted() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingDispatcherGateway build = TestingDispatcherGateway.newBuilder().setAddress("myAddress").build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            try {
                oneShotLatch.await();
                return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(build).build();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                CompletableFuture leaderAddressFuture = createDispatcherLeaderProcess.getLeaderAddressFuture();
                createDispatcherLeaderProcess.start();
                Assertions.assertThat(leaderAddressFuture).isNotDone();
                oneShotLatch.trigger();
                FlinkAssertions.assertThatFuture(leaderAddressFuture).eventuallySucceeds().isEqualTo("myAddress");
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void closeAsync_duringJobRecovery_preventsDispatcherServiceCreation() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction(collection -> {
            oneShotLatch.trigger();
            oneShotLatch2.await();
            return collection;
        }).build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            oneShotLatch3.trigger();
            return TestingDispatcherGatewayService.newBuilder().build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                oneShotLatch.await();
                createDispatcherLeaderProcess.closeAsync();
                oneShotLatch2.trigger();
                Assertions.assertThatThrownBy(() -> {
                    oneShotLatch3.await(10L, TimeUnit.MILLISECONDS);
                }, "No dispatcher service should be created after the process has been stopped.", new Object[0]).isInstanceOf(TimeoutException.class);
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void onRemovedJobGraph_terminatesRunningJob() throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGatewayService build = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            completableFuture.complete(jobID);
            return FutureUtils.completedVoidFuture();
        }).build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return build;
        });
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
            Throwable th = null;
            try {
                try {
                    createDispatcherLeaderProcess.start();
                    createDispatcherLeaderProcess.getDispatcherGateway().get();
                    this.jobGraphStore.globalCleanupAsync(JOB_GRAPH.getJobID(), newSingleThreadExecutor).join();
                    createDispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
                    Assertions.assertThat((Comparable) completableFuture.get()).isEqualTo(JOB_GRAPH.getJobID());
                    if (createDispatcherLeaderProcess != null) {
                        if (0 != 0) {
                            try {
                                createDispatcherLeaderProcess.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createDispatcherLeaderProcess.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            Assertions.assertThat(newSingleThreadExecutor.shutdownNow()).isEmpty();
        }
    }

    @Test
    void onRemovedJobGraph_failingRemovalCall_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        TestingDispatcherGatewayService build = TestingDispatcherGatewayService.newBuilder().setOnRemovedJobGraphFunction(jobID -> {
            return FutureUtils.completedExceptionally(flinkException);
        }).build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return build;
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            createDispatcherLeaderProcess.start();
            createDispatcherLeaderProcess.getDispatcherGateway().get();
            createDispatcherLeaderProcess.onRemovedJobGraph(JOB_GRAPH.getJobID());
            Assertions.assertThat(this.fatalErrorHandler.getErrorFuture().join()).hasCause(flinkException);
            this.fatalErrorHandler.clearError();
            if (createDispatcherLeaderProcess != null) {
                if (0 == 0) {
                    createDispatcherLeaderProcess.close();
                    return;
                }
                try {
                    createDispatcherLeaderProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDispatcherLeaderProcess != null) {
                if (0 != 0) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void onAddedJobGraph_submitsRecoveredJob() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.dispatcherServiceFactory = createFactoryBasedOnGenericSupplier(() -> {
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(build).build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                Assertions.assertThat(((JobGraph) completableFuture.get()).getJobID()).isEqualTo(JOB_GRAPH.getJobID());
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void onAddedJobGraph_ifNotRunning_isBeingIgnored() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            completableFuture.complete(jobID);
            return (JobGraph) map.get(jobID);
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.closeAsync();
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                Assertions.assertThatThrownBy(() -> {
                }, "onAddedJobGraph should be ignored if the leader process is not running.", new Object[0]).isInstanceOf(TimeoutException.class);
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void onAddedJobGraph_failingRecovery_propagatesTheFailure() throws Exception {
        Throwable flinkException = new FlinkException("Expected failure");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            throw flinkException;
        }).build();
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                this.jobGraphStore.putJobGraph(JOB_GRAPH);
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                FlinkAssertions.assertThatFuture(this.fatalErrorHandler.getErrorFuture()).eventuallySucceeds().extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE).contains(new Throwable[]{flinkException});
                Assertions.assertThat(createDispatcherLeaderProcess.getState()).isEqualTo(AbstractDispatcherLeaderProcess.State.STOPPED);
                this.fatalErrorHandler.clearError();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void recoverJobs_withRecoveryFailure_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setRecoverJobGraphFunction((jobID, map) -> {
            throw flinkException;
        }).setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        runJobRecoveryFailureTest(flinkException);
    }

    @Test
    void recoverJobs_withJobIdRecoveryFailure_failsFatally() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setJobIdsFunction(collection -> {
            throw flinkException;
        }).build();
        runJobRecoveryFailureTest(flinkException);
    }

    private void runJobRecoveryFailureTest(FlinkException flinkException) throws Exception {
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                FlinkAssertions.assertThatFuture(this.fatalErrorHandler.getErrorFuture()).eventuallySucceeds().satisfies(new ThrowingConsumer[]{th2 -> {
                    Assertions.assertThat(th2).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(flinkException.getClass(), flinkException.getMessage())});
                }});
                this.fatalErrorHandler.clearError();
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th5;
        }
    }

    @Test
    void onAddedJobGraph_failingRecoveredJobSubmission_failsFatally() throws Exception {
        runOnAddedJobGraphTest(TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(), "test exception"));
        }).build(), this::verifyOnAddedJobGraphResultFailsFatally);
    }

    private void verifyOnAddedJobGraphResultFailsFatally(TestingFatalErrorHandler testingFatalErrorHandler) {
        Assertions.assertThat(testingFatalErrorHandler.getErrorFuture().join()).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE).hasAtLeastOneElementOfType(JobSubmissionException.class);
        testingFatalErrorHandler.clearError();
    }

    @Test
    void onAddedJobGraph_duplicateJobSubmissionDueToFalsePositive_willBeIgnored() throws Exception {
        runOnAddedJobGraphTest(TestingDispatcherGateway.newBuilder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(DuplicateJobSubmissionException.of(jobGraph.getJobID()));
        }).build(), this::verifyOnAddedJobGraphResultDidNotFail);
    }

    private void runOnAddedJobGraphTest(TestingDispatcherGateway testingDispatcherGateway, org.apache.flink.util.function.ThrowingConsumer<TestingFatalErrorHandler, Exception> throwingConsumer) throws Exception {
        this.jobGraphStore = TestingJobGraphStore.newBuilder().setInitialJobGraphs(Collections.singleton(JOB_GRAPH)).build();
        this.dispatcherServiceFactory = createFactoryBasedOnJobGraphs(collection -> {
            Assertions.assertThat(collection).containsExactlyInAnyOrder(new JobGraph[]{JOB_GRAPH});
            return TestingDispatcherGatewayService.newBuilder().setDispatcherGateway(testingDispatcherGateway).build();
        });
        SessionDispatcherLeaderProcess createDispatcherLeaderProcess = createDispatcherLeaderProcess();
        Throwable th = null;
        try {
            try {
                createDispatcherLeaderProcess.start();
                createDispatcherLeaderProcess.getDispatcherGateway().get();
                createDispatcherLeaderProcess.onAddedJobGraph(JOB_GRAPH.getJobID());
                throwingConsumer.accept(this.fatalErrorHandler);
                if (createDispatcherLeaderProcess != null) {
                    if (0 == 0) {
                        createDispatcherLeaderProcess.close();
                        return;
                    }
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherLeaderProcess != null) {
                if (th != null) {
                    try {
                        createDispatcherLeaderProcess.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherLeaderProcess.close();
                }
            }
            throw th4;
        }
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnJobGraphs(Function<Collection<JobGraph>, AbstractDispatcherLeaderProcess.DispatcherGatewayService> function) {
        return (dispatcherId, collection, collection2, jobGraphWriter, jobResultStore) -> {
            return (AbstractDispatcherLeaderProcess.DispatcherGatewayService) function.apply(collection);
        };
    }

    private AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory createFactoryBasedOnGenericSupplier(Supplier<AbstractDispatcherLeaderProcess.DispatcherGatewayService> supplier) {
        return (dispatcherId, collection, collection2, jobGraphWriter, jobResultStore) -> {
            return (AbstractDispatcherLeaderProcess.DispatcherGatewayService) supplier.get();
        };
    }

    private void verifyOnAddedJobGraphResultDidNotFail(TestingFatalErrorHandler testingFatalErrorHandler) {
        Assertions.assertThatThrownBy(() -> {
            testingFatalErrorHandler.getErrorFuture().get(10L, TimeUnit.MILLISECONDS);
        }, "Expected that duplicate job submissions due to false job recoveries are ignored.", new Object[0]).isInstanceOf(TimeoutException.class);
    }

    private SessionDispatcherLeaderProcess createDispatcherLeaderProcess() {
        return SessionDispatcherLeaderProcess.create(this.leaderSessionId, this.dispatcherServiceFactory, this.jobGraphStore, this.jobResultStore, ioExecutor, this.fatalErrorHandler);
    }
}
