package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleaner;
import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher.class */
public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> implements DispatcherGateway {
    public static final String DISPATCHER_NAME = "dispatcher";
    private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16;
    private final Configuration configuration;
    private final JobGraphWriter jobGraphWriter;
    private final JobResultStore jobResultStore;
    private final HighAvailabilityServices highAvailabilityServices;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry;
    private final Collection<JobGraph> recoveredJobs;
    private final Collection<JobResult> recoveredDirtyJobs;
    private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
    private final ExecutionGraphInfoStore executionGraphInfoStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final CleanupRunnerFactory cleanupRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;
    private final Executor ioExecutor;

    @Nullable
    private final String metricServiceQueryAddress;
    private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures;
    protected final CompletableFuture<ApplicationStatus> shutDownFuture;
    private DispatcherBootstrap dispatcherBootstrap;
    private final DispatcherCachedOperationsHandler dispatcherCachedOperationsHandler;
    private final ResourceCleaner localResourceCleaner;
    private final ResourceCleaner globalResourceCleaner;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$CleanupJobState.class */
    public static class CleanupJobState {
        private final boolean globalCleanup;
        private final JobStatus jobStatus;

        public static CleanupJobState localCleanup(JobStatus jobStatus) {
            return new CleanupJobState(false, jobStatus);
        }

        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
            return new CleanupJobState(true, jobStatus);
        }

        private CleanupJobState(boolean z, JobStatus jobStatus) {
            this.globalCleanup = z;
            this.jobStatus = jobStatus;
        }

        public boolean isGlobalCleanup() {
            return this.globalCleanup;
        }

        public JobStatus getJobStatus() {
            return this.jobStatus;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$ExecutionType.class */
    public enum ExecutionType {
        SUBMISSION,
        RECOVERY
    }

    public Dispatcher(RpcService rpcService, DispatcherId dispatcherId, Collection<JobGraph> collection, Collection<JobResult> collection2, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception {
        this(rpcService, dispatcherId, collection, collection2, dispatcherBootstrapFactory, dispatcherServices, new DefaultJobManagerRunnerRegistry(16));
    }

    private Dispatcher(RpcService rpcService, DispatcherId dispatcherId, Collection<JobGraph> collection, Collection<JobResult> collection2, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices, JobManagerRunnerRegistry jobManagerRunnerRegistry) throws Exception {
        this(rpcService, dispatcherId, collection, collection2, dispatcherBootstrapFactory, dispatcherServices, jobManagerRunnerRegistry, new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    public Dispatcher(RpcService rpcService, DispatcherId dispatcherId, Collection<JobGraph> collection, Collection<JobResult> collection2, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices, JobManagerRunnerRegistry jobManagerRunnerRegistry, ResourceCleanerFactory resourceCleanerFactory) throws Exception {
        super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), dispatcherId);
        assertRecoveredJobsAndDirtyJobResults(collection, collection2);
        this.configuration = dispatcherServices.getConfiguration();
        this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
        this.resourceManagerGatewayRetriever = dispatcherServices.getResourceManagerGatewayRetriever();
        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
        this.blobServer = dispatcherServices.getBlobServer();
        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
        this.jobResultStore = dispatcherServices.getJobResultStore();
        this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
        this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
        this.ioExecutor = dispatcherServices.getIoExecutor();
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(this.configuration, this.blobServer, this.fatalErrorHandler);
        this.jobManagerRunnerRegistry = new OnMainThreadJobManagerRunnerRegistry(jobManagerRunnerRegistry, getMainThreadExecutor());
        this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
        this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
        this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
        this.cleanupRunnerFactory = dispatcherServices.getCleanupRunnerFactory();
        this.jobManagerRunnerTerminationFutures = new HashMap(16);
        this.shutDownFuture = new CompletableFuture<>();
        this.dispatcherBootstrapFactory = (DispatcherBootstrapFactory) Preconditions.checkNotNull(dispatcherBootstrapFactory);
        this.recoveredJobs = new HashSet(collection);
        this.recoveredDirtyJobs = new HashSet(collection2);
        this.blobServer.retainJobs((Collection) collection.stream().map((v0) -> {
            return v0.getJobID();
        }).collect(Collectors.toSet()), dispatcherServices.getIoExecutor());
        this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler(dispatcherServices.getOperationCaches(), this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation);
        this.localResourceCleaner = resourceCleanerFactory.createLocalResourceCleaner(getMainThreadExecutor());
        this.globalResourceCleaner = resourceCleanerFactory.createGlobalResourceCleaner(getMainThreadExecutor());
    }

    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    public void onStart() throws Exception {
        try {
            startDispatcherServices();
            startCleanupRetries();
            startRecoveredJobs();
            this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create((DispatcherGateway) getSelfGateway(DispatcherGateway.class), getRpcService().getScheduledExecutor(), this::onFatalError);
        } catch (Throwable th) {
            FlinkException dispatcherException = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), th);
            onFatalError(dispatcherException);
            throw dispatcherException;
        }
    }

    private void startDispatcherServices() throws Exception {
        try {
            registerDispatcherMetrics(this.jobManagerMetricGroup);
        } catch (Exception e) {
            handleStartDispatcherServicesException(e);
        }
    }

    private static void assertRecoveredJobsAndDirtyJobResults(Collection<JobGraph> collection, Collection<JobResult> collection2) {
        Set set = (Set) collection2.stream().map((v0) -> {
            return v0.getJobId();
        }).collect(Collectors.toSet());
        Preconditions.checkArgument(collection.stream().noneMatch(jobGraph -> {
            return set.contains(jobGraph.getJobID());
        }), "There should be no overlap between the recovered JobGraphs and the passed dirty JobResults based on their job ID.");
    }

    private void startRecoveredJobs() {
        Iterator<JobGraph> it = this.recoveredJobs.iterator();
        while (it.hasNext()) {
            runRecoveredJob(it.next());
        }
        this.recoveredJobs.clear();
    }

    private void runRecoveredJob(JobGraph jobGraph) {
        Preconditions.checkNotNull(jobGraph);
        try {
            runJob(createJobMasterRunner(jobGraph), ExecutionType.RECOVERY);
        } catch (Throwable th) {
            onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", jobGraph.getJobID()), th));
        }
    }

    private void startCleanupRetries() {
        this.recoveredDirtyJobs.forEach(this::runCleanupRetry);
        this.recoveredDirtyJobs.clear();
    }

    private void runCleanupRetry(JobResult jobResult) {
        Preconditions.checkNotNull(jobResult);
        try {
            runJob(createJobCleanupRunner(jobResult), ExecutionType.RECOVERY);
        } catch (Throwable th) {
            onFatalError(new DispatcherException(String.format("Could not start cleanup retry for job %s.", jobResult.getJobId()), th));
        }
    }

    private void handleStartDispatcherServicesException(Exception exc) throws Exception {
        try {
            stopDispatcherServices();
        } catch (Exception e) {
            exc.addSuppressed(e);
        }
        throw exc;
    }

    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping dispatcher {}.", getAddress());
        return FutureUtils.runAfterwards(terminateRunningJobsAndGetTerminationFuture(), () -> {
            this.dispatcherBootstrap.stop();
            stopDispatcherServices();
            this.log.info("Stopped dispatcher {}.", getAddress());
        });
    }

    private void stopDispatcherServices() throws Exception {
        Exception exc = null;
        try {
            this.jobManagerSharedServices.shutdown();
        } catch (Exception e) {
            exc = e;
        }
        this.jobManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException(exc);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r11v1, types: [java.lang.Throwable] */
    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time time) {
        this.log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobGraph.getJobID());
        try {
            if (!isDuplicateJob(jobGraph.getJobID())) {
                return isPartialResourceConfigured(jobGraph) ? FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have resources configured. The limitation will be removed in future versions.")) : internalSubmitJob(jobGraph);
            }
            if (isInGloballyTerminalState(jobGraph.getJobID())) {
                this.log.warn("Ignoring JobGraph submission '{}' ({}) because the job already reached a globally-terminal state (i.e. {}) in a previous execution.", new Object[]{jobGraph.getName(), jobGraph.getJobID(), Arrays.stream(JobStatus.values()).filter((v0) -> {
                    return v0.isGloballyTerminalState();
                }).map((v0) -> {
                    return v0.name();
                }).collect(Collectors.joining(", "))});
            }
            return FutureUtils.completedExceptionally(isInGloballyTerminalState(jobGraph.getJobID()) ? DuplicateJobSubmissionException.ofGloballyTerminated(jobGraph.getJobID()) : DuplicateJobSubmissionException.of(jobGraph.getJobID()));
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitFailedJob(JobID jobID, String str, Throwable th) {
        ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph(jobID, str, JobStatus.FAILED, th, null, System.currentTimeMillis()));
        writeToExecutionGraphInfoStore(executionGraphInfo);
        return archiveExecutionGraphToHistoryServer(executionGraphInfo);
    }

    private boolean isDuplicateJob(JobID jobID) throws FlinkException {
        return isInGloballyTerminalState(jobID) || this.jobManagerRunnerRegistry.isRegistered(jobID);
    }

    private boolean isInGloballyTerminalState(JobID jobID) throws FlinkException {
        try {
            return this.jobResultStore.hasJobResultEntry(jobID);
        } catch (IOException e) {
            throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobID), e);
        }
    }

    private boolean isPartialResourceConfigured(JobGraph jobGraph) {
        boolean z = false;
        boolean z2 = false;
        Iterator<JobVertex> it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            if (it.next().getMinResources() == ResourceSpec.UNKNOWN) {
                z = true;
            } else {
                z2 = true;
            }
            if (z && z2) {
                return true;
            }
        }
        return false;
    }

    private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        this.log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID());
        return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob).handle((r6, th) -> {
            return handleTermination(jobGraph.getJobID(), th);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private CompletableFuture<Acknowledge> handleTermination(JobID jobID, @Nullable Throwable th) {
        return th != null ? this.globalResourceCleaner.cleanupAsync(jobID).handleAsync((r11, th2) -> {
            if (th2 != null) {
                this.log.warn("Cleanup didn't succeed after job submission failed for job {}.", jobID, th2);
                th.addSuppressed(th2);
            }
            ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(th);
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            this.log.error("Failed to submit job {}.", jobID, stripCompletionException);
            throw new CompletionException((Throwable) new JobSubmissionException(jobID, "Failed to submit job.", stripCompletionException));
        }, (Executor) getMainThreadExecutor()) : CompletableFuture.completedFuture(Acknowledge.get());
    }

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.jobGraphWriter.putJobGraph(jobGraph);
        runJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
    }

    private JobManagerRunner createJobMasterRunner(JobGraph jobGraph) throws Exception {
        Preconditions.checkState(!this.jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID()));
        return this.jobManagerRunnerFactory.createJobManagerRunner(jobGraph, this.configuration, getRpcService(), this.highAvailabilityServices, this.heartbeatServices, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler, System.currentTimeMillis());
    }

    private JobManagerRunner createJobCleanupRunner(JobResult jobResult) throws Exception {
        Preconditions.checkState(!this.jobManagerRunnerRegistry.isRegistered(jobResult.getJobId()));
        return this.cleanupRunnerFactory.create(jobResult, this.highAvailabilityServices.getCheckpointRecoveryFactory(), this.configuration, this.ioExecutor);
    }

    private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) throws Exception {
        jobManagerRunner.start();
        this.jobManagerRunnerRegistry.register(jobManagerRunner);
        JobID jobID = jobManagerRunner.getJobID();
        CompletableFuture<Void> thenCompose = jobManagerRunner.getResultFuture().handleAsync((jobManagerRunnerResult, th) -> {
            Preconditions.checkState(this.jobManagerRunnerRegistry.isRegistered(jobID) && this.jobManagerRunnerRegistry.get(jobID) == jobManagerRunner, "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");
            return jobManagerRunnerResult != null ? handleJobManagerRunnerResult(jobManagerRunnerResult, executionType) : CompletableFuture.completedFuture(jobManagerRunnerFailed(jobID, JobStatus.FAILED, th));
        }, (Executor) getMainThreadExecutor()).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).thenCompose(cleanupJobState -> {
            return removeJob(jobID, cleanupJobState).exceptionally(th2 -> {
                return logCleanupErrorWarning(jobID, th2);
            });
        });
        FutureUtils.handleUncaughtException(thenCompose, (thread, th2) -> {
            this.fatalErrorHandler.onFatalError(th2);
        });
        registerJobManagerRunnerTerminationFuture(jobID, thenCompose);
    }

    @Nullable
    private Void logCleanupErrorWarning(JobID jobID, Throwable th) {
        this.log.warn("The cleanup of job {} failed. The job's artifacts in the different directories ('{}', '{}', '{}') and its JobResultStore entry in '{}' (in HA mode) should be checked for manual cleanup.", new Object[]{jobID, this.configuration.get(HighAvailabilityOptions.HA_STORAGE_PATH), this.configuration.get(BlobServerOptions.STORAGE_DIRECTORY), this.configuration.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY), this.configuration.get(JobResultStoreOptions.STORAGE_PATH), th});
        return null;
    }

    private CompletableFuture<CleanupJobState> handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult, ExecutionType executionType) {
        return (jobManagerRunnerResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) ? CompletableFuture.completedFuture(jobManagerRunnerFailed(jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), JobStatus.INITIALIZING, jobManagerRunnerResult.getInitializationFailure())) : jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
    }

    private CleanupJobState jobManagerRunnerFailed(JobID jobID, JobStatus jobStatus, Throwable th) {
        jobMasterFailed(jobID, th);
        return CleanupJobState.localCleanup(jobStatus);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Collection<JobID>> listJobs(Time time) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(this.jobManagerRunnerRegistry.getRunningJobIds()));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> disposeSavepoint(String str, Time time) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", str);
            try {
                Checkpoints.disposeSavepoint(str, this.configuration, contextClassLoader, this.log);
                return Acknowledge.get();
            } catch (IOException | FlinkException e) {
                throw new CompletionException((Throwable) new FlinkException(String.format("Could not dispose savepoint %s.", str), e));
            }
        }, this.jobManagerSharedServices.getIoExecutor());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> cancelJob(JobID jobID, Time time) {
        Optional<JobManagerRunner> jobManagerRunner = getJobManagerRunner(jobID);
        if (jobManagerRunner.isPresent()) {
            return jobManagerRunner.get().cancel(time);
        }
        ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobID);
        if (executionGraphInfo != null) {
            JobStatus state = executionGraphInfo.getArchivedExecutionGraph().getState();
            return state == JobStatus.CANCELED ? CompletableFuture.completedFuture(Acknowledge.get()) : FutureUtils.completedExceptionally(new FlinkJobTerminatedWithoutCancellationException(jobID, state));
        }
        this.log.debug("Dispatcher is unable to cancel job {}: not found", jobID);
        return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        CompletableFuture runResourceManagerCommand = runResourceManagerCommand(resourceManagerGateway -> {
            return resourceManagerGateway.requestResourceOverview(time);
        });
        CompletableFuture thenApply = FutureUtils.combineAll(queryJobMastersForInformation(jobManagerRunner -> {
            return jobManagerRunner.requestJobStatus(time);
        })).thenApply(this::flattenOptionalCollection);
        JobsOverview storedJobsOverview = this.executionGraphInfoStore.getStoredJobsOverview();
        return thenApply.thenCombine((CompletionStage) runResourceManagerCommand, (collection, resourceOverview) -> {
            return new ClusterOverview(resourceOverview, JobsOverview.create(collection).combine(storedJobsOverview));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time time) {
        CompletableFuture thenApply = FutureUtils.combineAll(queryJobMastersForInformation(jobManagerRunner -> {
            return jobManagerRunner.requestJobDetails(time);
        })).thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> availableJobDetails = this.executionGraphInfoStore.getAvailableJobDetails();
        return thenApply.thenApply(collection -> {
            HashMap hashMap = new HashMap();
            availableJobDetails.forEach(jobDetails -> {
            });
            collection.forEach(jobDetails2 -> {
            });
            return new MultipleJobsDetails(new HashSet(hashMap.values()));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobID, Time time) {
        return (CompletableFuture) getJobManagerRunner(jobID).map(jobManagerRunner -> {
            return jobManagerRunner.requestJobStatus(time);
        }).orElseGet(() -> {
            JobDetails availableJobDetails = this.executionGraphInfoStore.getAvailableJobDetails(jobID);
            return availableJobDetails == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : CompletableFuture.completedFuture(availableJobDetails.getStatus());
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(JobID jobID, Time time) {
        return ((CompletableFuture) getJobManagerRunner(jobID).map(jobManagerRunner -> {
            return jobManagerRunner.requestJob(time);
        }).orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)))).exceptionally(th -> {
            ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobID);
            if (executionGraphInfo == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException(th));
            }
            return executionGraphInfo;
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobResult> requestJobResult(JobID jobID, Time time) {
        if (this.jobManagerRunnerRegistry.isRegistered(jobID)) {
            return this.jobManagerRunnerRegistry.get(jobID).getResultFuture().thenApply(jobManagerRunnerResult -> {
                return JobResult.createFrom(jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph());
            });
        }
        ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobID);
        return executionGraphInfo == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : CompletableFuture.completedFuture(JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Time time) {
        return this.metricServiceQueryAddress != null ? CompletableFuture.completedFuture(Collections.singleton(this.metricServiceQueryAddress)) : CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServiceAddresses(Time time) {
        return runResourceManagerCommand(resourceManagerGateway -> {
            return resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses(time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time time) {
        return CompletableFuture.completedFuture(ThreadDumpInfo.dumpAndCreate(((Integer) this.configuration.get(ClusterOptions.THREAD_DUMP_STACKTRACE_MAX_DEPTH)).intValue()));
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Integer> getBlobServerPort(Time time) {
        return CompletableFuture.completedFuture(Integer.valueOf(this.blobServer.getPort()));
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<String> triggerCheckpoint(JobID jobID, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.triggerCheckpoint(time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> triggerSavepoint(AsynchronousJobOperationKey asynchronousJobOperationKey, String str, SavepointFormatType savepointFormatType, TriggerSavepointMode triggerSavepointMode, Time time) {
        return this.dispatcherCachedOperationsHandler.triggerSavepoint(asynchronousJobOperationKey, str, savepointFormatType, triggerSavepointMode, time);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<String> triggerSavepointAndGetLocation(JobID jobID, String str, SavepointFormatType savepointFormatType, TriggerSavepointMode triggerSavepointMode, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.triggerSavepoint(str, triggerSavepointMode.isTerminalMode(), savepointFormatType, time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<OperationResult<String>> getTriggeredSavepointStatus(AsynchronousJobOperationKey asynchronousJobOperationKey) {
        return this.dispatcherCachedOperationsHandler.getSavepointStatus(asynchronousJobOperationKey);
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> stopWithSavepoint(AsynchronousJobOperationKey asynchronousJobOperationKey, String str, SavepointFormatType savepointFormatType, TriggerSavepointMode triggerSavepointMode, Time time) {
        return this.dispatcherCachedOperationsHandler.stopWithSavepoint(asynchronousJobOperationKey, str, savepointFormatType, triggerSavepointMode, time);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<String> stopWithSavepointAndGetLocation(JobID jobID, String str, SavepointFormatType savepointFormatType, TriggerSavepointMode triggerSavepointMode, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.stopWithSavepoint(str, savepointFormatType, triggerSavepointMode.isTerminalMode(), time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> shutDownCluster() {
        return shutDownCluster(ApplicationStatus.SUCCEEDED);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
        this.shutDownFuture.complete(applicationStatus);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobID, OperatorID operatorID, SerializedValue<CoordinationRequest> serializedValue, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorID, serializedValue, time);
        });
    }

    private void registerJobManagerRunnerTerminationFuture(JobID jobID, CompletableFuture<Void> completableFuture) {
        Preconditions.checkState(!this.jobManagerRunnerTerminationFutures.containsKey(jobID));
        this.jobManagerRunnerTerminationFutures.put(jobID, completableFuture);
        completableFuture.thenRunAsync(() -> {
            CompletableFuture<Void> remove = this.jobManagerRunnerTerminationFutures.remove(jobID);
            if (remove == null || remove == completableFuture) {
                return;
            }
            this.jobManagerRunnerTerminationFutures.put(jobID, remove);
        }, (Executor) getMainThreadExecutor());
    }

    private CompletableFuture<Void> removeJob(JobID jobID, CleanupJobState cleanupJobState) {
        return cleanupJobState.isGlobalCleanup() ? this.globalResourceCleaner.cleanupAsync(jobID).thenRunAsync(() -> {
            markJobAsClean(jobID);
        }, this.ioExecutor).thenRunAsync(() -> {
            runPostJobGloballyTerminated(jobID, cleanupJobState.getJobStatus());
        }, (Executor) getMainThreadExecutor()) : this.localResourceCleaner.cleanupAsync(jobID);
    }

    private void markJobAsClean(JobID jobID) {
        try {
            this.jobResultStore.markResultAsClean(jobID);
            this.log.debug("Cleanup for the job '{}' has finished. Job has been marked as clean.", jobID);
        } catch (IOException e) {
            this.log.warn("Could not properly mark job {} result as clean.", jobID, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runPostJobGloballyTerminated(JobID jobID, JobStatus jobStatus) {
    }

    private void terminateRunningJobs() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
        Iterator<JobID> it = this.jobManagerRunnerRegistry.getRunningJobIds().iterator();
        while (it.hasNext()) {
            terminateJob(it.next());
        }
    }

    private void terminateJob(JobID jobID) {
        if (this.jobManagerRunnerRegistry.isRegistered(jobID)) {
            this.jobManagerRunnerRegistry.get(jobID).closeAsync();
        }
    }

    private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
        terminateRunningJobs();
        return FutureUtils.completeAll(this.jobManagerRunnerTerminationFutures.values());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onFatalError(Throwable th) {
        this.fatalErrorHandler.onFatalError(th);
    }

    @VisibleForTesting
    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) {
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus state = archivedExecutionGraph.getState();
        Preconditions.checkArgument(state.isTerminalState(), "Job %s is in state %s which is not terminal.", new Object[]{archivedExecutionGraph.getJobID(), state});
        boolean z = state == JobStatus.SUSPENDED || state == JobStatus.FAILED;
        if (archivedExecutionGraph.getFailureInfo() == null || !z) {
            this.log.info("Job {} reached terminal state {}.", archivedExecutionGraph.getJobID(), state);
        } else {
            this.log.info("Job {} reached terminal state {}.\n{}", new Object[]{archivedExecutionGraph.getJobID(), state, archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim()});
        }
        writeToExecutionGraphInfoStore(executionGraphInfo);
        return !state.isGloballyTerminalState() ? CompletableFuture.completedFuture(CleanupJobState.localCleanup(state)) : archiveExecutionGraphToHistoryServer(executionGraphInfo).thenCompose(acknowledge -> {
            return registerGloballyTerminatedJobInJobResultStore(executionGraphInfo);
        });
    }

    private CompletableFuture<CleanupJobState> registerGloballyTerminatedJobInJobResultStore(ExecutionGraphInfo executionGraphInfo) {
        CompletableFuture completableFuture = new CompletableFuture();
        JobID jobId = executionGraphInfo.getJobId();
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        JobStatus state = archivedExecutionGraph.getState();
        Preconditions.checkArgument(state.isGloballyTerminalState(), "Job %s is in state %s which is not globally terminal.", new Object[]{jobId, state});
        this.ioExecutor.execute(() -> {
            try {
                if (this.jobResultStore.hasCleanJobResultEntry(jobId)) {
                    this.log.warn("Job {} is already marked as clean but clean up was triggered again.", jobId);
                } else if (!this.jobResultStore.hasDirtyJobResultEntry(jobId)) {
                    this.jobResultStore.createDirtyResult(new JobResultEntry(JobResult.createFrom(archivedExecutionGraph)));
                    this.log.info("Job {} has been registered for cleanup in the JobResultStore after reaching a terminal state.", jobId);
                }
                completableFuture.complete(null);
            } catch (IOException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture.handleAsync((r12, th) -> {
            if (th != null) {
                this.fatalErrorHandler.onFatalError(new FlinkException(String.format("The job %s couldn't be marked as pre-cleanup finished in JobResultStore.", executionGraphInfo.getJobId()), th));
            }
            return CleanupJobState.globalCleanup(state);
        }, (Executor) getMainThreadExecutor());
    }

    private void writeToExecutionGraphInfoStore(ExecutionGraphInfo executionGraphInfo) {
        try {
            this.executionGraphInfoStore.put(executionGraphInfo);
        } catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), e});
        }
    }

    private CompletableFuture<Acknowledge> archiveExecutionGraphToHistoryServer(ExecutionGraphInfo executionGraphInfo) {
        return this.historyServerArchivist.archiveExecutionGraph(executionGraphInfo).handleAsync((acknowledge, th) -> {
            if (th != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), th});
            }
            return Acknowledge.get();
        }, (Executor) getMainThreadExecutor());
    }

    private void jobMasterFailed(JobID jobID, Throwable th) {
        onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobID), th));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobID) {
        if (!this.jobManagerRunnerRegistry.isRegistered(jobID)) {
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
        }
        JobManagerRunner jobManagerRunner = this.jobManagerRunnerRegistry.get(jobID);
        return !jobManagerRunner.isInitialized() ? FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.")) : jobManagerRunner.getJobMasterGateway();
    }

    private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobID, Function<JobMasterGateway, CompletableFuture<T>> function) {
        return (CompletableFuture<T>) getJobMasterGateway(jobID).thenCompose((Function<? super JobMasterGateway, ? extends CompletionStage<U>>) function);
    }

    private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
        return this.resourceManagerGatewayRetriever.getFuture();
    }

    private Optional<JobManagerRunner> getJobManagerRunner(JobID jobID) {
        return this.jobManagerRunnerRegistry.isRegistered(jobID) ? Optional.of(this.jobManagerRunnerRegistry.get(jobID)) : Optional.empty();
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> function) {
        return getResourceManagerGateway().thenApply((Function<? super ResourceManagerGateway, ? extends U>) function).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> collection) {
        return (List) collection.stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<JobManagerRunner, CompletableFuture<T>> function) {
        ArrayList arrayList = new ArrayList(this.jobManagerRunnerRegistry.size());
        Iterator<JobManagerRunner> it = this.jobManagerRunnerRegistry.getJobManagerRunners().iterator();
        while (it.hasNext()) {
            arrayList.add(function.apply(it.next()).handle((BiFunction) (obj, th) -> {
                return Optional.ofNullable(obj);
            }));
        }
        return arrayList;
    }

    private CompletableFuture<Void> waitForTerminatingJob(JobID jobID, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> throwingConsumer) {
        return getJobTerminationFuture(jobID).exceptionally(th -> {
            throw new CompletionException((Throwable) new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobID), th));
        }).thenAcceptAsync(FunctionUtils.uncheckedConsumer(r7 -> {
            this.jobManagerRunnerTerminationFutures.remove(jobID);
            throwingConsumer.accept(jobGraph);
        }), (Executor) getMainThreadExecutor());
    }

    @VisibleForTesting
    CompletableFuture<Void> getJobTerminationFuture(JobID jobID) {
        return this.jobManagerRunnerTerminationFutures.getOrDefault(jobID, CompletableFuture.completedFuture(null));
    }

    private void registerDispatcherMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> {
            return Long.valueOf(this.jobManagerRunnerRegistry.m106getWrappedDelegate().size());
        });
    }

    public CompletableFuture<Void> onRemovedJobGraph(JobID jobID) {
        return CompletableFuture.runAsync(() -> {
            terminateJob(jobID);
        }, getMainThreadExecutor());
    }
}
