package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.JobStatusProvider;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
import org.apache.flink.runtime.scheduler.metrics.JobStatusMetrics;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.util.BoundedFIFOQueue;
import org.apache.flink.runtime.util.IntArrayList;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerBase.class */
public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling {
    private final Logger log;
    private final JobGraph jobGraph;
    private final ExecutionGraph executionGraph;
    private final SchedulingTopology schedulingTopology;
    protected final StateLocationRetriever stateLocationRetriever = executionVertexID -> {
        return getExecutionVertex(executionVertexID).getPreferredLocationBasedOnState();
    };
    protected final InputsLocationsRetriever inputsLocationsRetriever;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointsCleaner checkpointsCleaner;
    private final CheckpointIDCounter checkpointIdCounter;
    protected final JobManagerJobMetricGroup jobManagerJobMetricGroup;
    protected final ExecutionVertexVersioner executionVertexVersioner;
    private final KvStateHandler kvStateHandler;
    private final ExecutionGraphHandler executionGraphHandler;
    protected final OperatorCoordinatorHandler operatorCoordinatorHandler;
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
    private final ExecutionGraphFactory executionGraphFactory;
    private final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings;
    private final DeploymentStateTimeMetrics deploymentStateTimeMetrics;

    public SchedulerBase(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, ExecutionVertexVersioner executionVertexVersioner, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, VertexParallelismStore vertexParallelismStore) throws Exception {
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.jobGraph = (JobGraph) Preconditions.checkNotNull(jobGraph);
        this.executionGraphFactory = executionGraphFactory;
        this.jobManagerJobMetricGroup = (JobManagerJobMetricGroup) Preconditions.checkNotNull(jobManagerJobMetricGroup);
        this.executionVertexVersioner = (ExecutionVertexVersioner) Preconditions.checkNotNull(executionVertexVersioner);
        this.mainThreadExecutor = componentMainThreadExecutor;
        this.checkpointsCleaner = checkpointsCleaner;
        this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(jobGraph, configuration, (CheckpointRecoveryFactory) Preconditions.checkNotNull(checkpointRecoveryFactory), executor, logger);
        this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(jobGraph, (CheckpointRecoveryFactory) Preconditions.checkNotNull(checkpointRecoveryFactory));
        this.jobStatusMetricsSettings = MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
        this.deploymentStateTimeMetrics = new DeploymentStateTimeMetrics(jobGraph.getJobType(), this.jobStatusMetricsSettings);
        this.executionGraph = createAndRestoreExecutionGraph(this.completedCheckpointStore, checkpointsCleaner, this.checkpointIdCounter, j, componentMainThreadExecutor, jobStatusListener, vertexParallelismStore);
        this.schedulingTopology = this.executionGraph.getSchedulingTopology();
        this.inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(this.executionGraph);
        this.kvStateHandler = new KvStateHandler(this.executionGraph);
        this.executionGraphHandler = new ExecutionGraphHandler(this.executionGraph, logger, executor, this.mainThreadExecutor);
        this.operatorCoordinatorHandler = new DefaultOperatorCoordinatorHandler(this.executionGraph, this::handleGlobalFailure);
        this.operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);
        this.exceptionHistory = new BoundedFIFOQueue<>(configuration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
    }

    private void shutDownCheckpointServices(JobStatus jobStatus) {
        Exception exc = null;
        try {
            this.completedCheckpointStore.shutdown(jobStatus, this.checkpointsCleaner);
        } catch (Exception e) {
            exc = e;
        }
        try {
            this.checkpointIdCounter.shutdown(jobStatus).get();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            this.log.error("Error while shutting down checkpoint services.", exc);
        }
    }

    private static int normalizeParallelism(int i) {
        if (i == -1) {
            return 1;
        }
        return i;
    }

    public static int getDefaultMaxParallelism(JobVertex jobVertex) {
        return KeyGroupRangeAssignment.computeDefaultMaxParallelism(normalizeParallelism(jobVertex.getParallelism()));
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> iterable, Function<JobVertex, Integer> function) {
        return computeVertexParallelismStore(iterable, function, (v0) -> {
            return normalizeParallelism(v0);
        });
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> iterable, Function<JobVertex, Integer> function, Function<Integer, Integer> function2) {
        boolean z;
        DefaultVertexParallelismStore defaultVertexParallelismStore = new DefaultVertexParallelismStore();
        for (JobVertex jobVertex : iterable) {
            int intValue = function2.apply(Integer.valueOf(jobVertex.getParallelism())).intValue();
            int maxParallelism = jobVertex.getMaxParallelism();
            if (maxParallelism == -1) {
                maxParallelism = function.apply(jobVertex).intValue();
                z = true;
            } else {
                z = false;
            }
            boolean z2 = z;
            defaultVertexParallelismStore.setParallelismInfo(jobVertex.getID(), new DefaultVertexParallelismInfo(intValue, maxParallelism, num -> {
                return z2 ? Optional.empty() : Optional.of("Cannot override a configured max parallelism.");
            }));
        }
        return defaultVertexParallelismStore;
    }

    public static VertexParallelismStore computeVertexParallelismStore(Iterable<JobVertex> iterable) {
        return computeVertexParallelismStore(iterable, SchedulerBase::getDefaultMaxParallelism);
    }

    public static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph) {
        return computeVertexParallelismStore(jobGraph.getVertices());
    }

    private ExecutionGraph createAndRestoreExecutionGraph(CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIDCounter, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, VertexParallelismStore vertexParallelismStore) throws Exception {
        ExecutionGraph createAndRestoreExecutionGraph = this.executionGraphFactory.createAndRestoreExecutionGraph(this.jobGraph, completedCheckpointStore, checkpointsCleaner, checkpointIDCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(this.jobGraph.getJobType()), j, new DefaultVertexAttemptNumberStore(), vertexParallelismStore, this.deploymentStateTimeMetrics, this.log);
        createAndRestoreExecutionGraph.setInternalTaskFailuresListener(new UpdateSchedulerNgOnInternalFailuresListener(this));
        createAndRestoreExecutionGraph.registerJobStatusListener(jobStatusListener);
        createAndRestoreExecutionGraph.start(componentMainThreadExecutor);
        return createAndRestoreExecutionGraph;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetForNewExecutions(Collection<ExecutionVertexID> collection) {
        collection.stream().forEach(this::resetForNewExecution);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetForNewExecution(ExecutionVertexID executionVertexID) {
        getExecutionVertex(executionVertexID).resetForNewExecution();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void restoreState(Set<ExecutionVertexID> set, boolean z) throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            if (z) {
                notifyCoordinatorsOfEmptyGlobalRestore();
                return;
            } else {
                notifyCoordinatorsOfSubtaskRestore(getInvolvedExecutionJobVerticesAndSubtasks(set), -1L);
                return;
            }
        }
        checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
        if (z) {
            checkpointCoordinator.restoreLatestCheckpointedStateToAll(getInvolvedExecutionJobVertices(set), true);
        } else {
            Map<ExecutionJobVertex, IntArrayList> involvedExecutionJobVerticesAndSubtasks = getInvolvedExecutionJobVerticesAndSubtasks(set);
            notifyCoordinatorsOfSubtaskRestore(involvedExecutionJobVerticesAndSubtasks, checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(involvedExecutionJobVerticesAndSubtasks.keySet()).orElse(-1L));
        }
    }

    private void notifyCoordinatorsOfSubtaskRestore(Map<ExecutionJobVertex, IntArrayList> map, long j) {
        for (Map.Entry<ExecutionJobVertex, IntArrayList> entry : map.entrySet()) {
            ExecutionJobVertex key = entry.getKey();
            IntArrayList value = entry.getValue();
            Collection<OperatorCoordinatorHolder> operatorCoordinators = key.getOperatorCoordinators();
            if (!operatorCoordinators.isEmpty()) {
                while (!value.isEmpty()) {
                    int removeLast = value.removeLast();
                    Iterator<OperatorCoordinatorHolder> it = operatorCoordinators.iterator();
                    while (it.hasNext()) {
                        it.next().subtaskReset(removeLast, j);
                    }
                }
            }
        }
    }

    private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception {
        for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getAllVertices().values()) {
            if (executionJobVertex.isInitialized()) {
                Iterator<OperatorCoordinatorHolder> it = executionJobVertex.getOperatorCoordinators().iterator();
                while (it.hasNext()) {
                    it.next().resetToCheckpoint(-1L, null);
                }
            }
        }
    }

    private Set<ExecutionJobVertex> getInvolvedExecutionJobVertices(Set<ExecutionVertexID> set) {
        HashSet hashSet = new HashSet();
        Iterator<ExecutionVertexID> it = set.iterator();
        while (it.hasNext()) {
            hashSet.add(getExecutionVertex(it.next()).getJobVertex());
        }
        return hashSet;
    }

    private Map<ExecutionJobVertex, IntArrayList> getInvolvedExecutionJobVerticesAndSubtasks(Set<ExecutionVertexID> set) {
        HashMap hashMap = new HashMap();
        Iterator<ExecutionVertexID> it = set.iterator();
        while (it.hasNext()) {
            ExecutionVertex executionVertex = getExecutionVertex(it.next());
            ((IntArrayList) hashMap.computeIfAbsent(executionVertex.getJobVertex(), executionJobVertex -> {
                return new IntArrayList(32);
            })).add(executionVertex.getParallelSubtaskIndex());
        }
        return hashMap;
    }

    protected void transitionToScheduled(List<ExecutionVertexID> list) {
        list.forEach(executionVertexID -> {
            getExecutionVertex(executionVertexID).getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setGlobalFailureCause(@Nullable Throwable th, long j) {
        if (th != null) {
            this.executionGraph.initFailureCause(th, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ComponentMainThreadExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void failJob(Throwable th, long j) {
        incrementVersionsOfAllVertices();
        cancelAllPendingSlotRequestsInternal();
        this.executionGraph.failJob(th, j);
        getJobTerminationFuture().thenRun(() -> {
            archiveGlobalFailure(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final SchedulingTopology getSchedulingTopology() {
        return this.schedulingTopology;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
        return this.executionGraph.getResultPartitionAvailabilityChecker();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void transitionToRunning() {
        this.executionGraph.transitionToRunning();
    }

    public ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexID) {
        return this.executionGraph.getAllVertices().get(executionVertexID.getJobVertexId()).getTaskVertices()[executionVertexID.getSubtaskIndex()];
    }

    public ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexID) {
        return this.executionGraph.getAllVertices().get(jobVertexID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobGraph getJobGraph() {
        return this.jobGraph;
    }

    protected abstract long getNumberOfRestarts();

    private Map<ExecutionVertexID, ExecutionVertexVersion> incrementVersionsOfAllVertices() {
        return this.executionVertexVersioner.recordVertexModifications((Collection) IterableUtils.toStream(this.schedulingTopology.getVertices()).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()));
    }

    protected abstract void cancelAllPendingSlotRequestsInternal();

    /* JADX INFO: Access modifiers changed from: protected */
    public void transitionExecutionGraphState(JobStatus jobStatus, JobStatus jobStatus2) {
        this.executionGraph.transitionState(jobStatus, jobStatus2);
    }

    @VisibleForTesting
    CheckpointCoordinator getCheckpointCoordinator() {
        return this.executionGraph.getCheckpointCoordinator();
    }

    @VisibleForTesting
    public ExecutionGraph getExecutionGraph() {
        return this.executionGraph;
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public final void startScheduling() {
        this.mainThreadExecutor.assertRunningInMainThread();
        JobManagerJobMetricGroup jobManagerJobMetricGroup = this.jobManagerJobMetricGroup;
        ExecutionGraph executionGraph = this.executionGraph;
        Gauge gauge = this::getNumberOfRestarts;
        DeploymentStateTimeMetrics deploymentStateTimeMetrics = this.deploymentStateTimeMetrics;
        ExecutionGraph executionGraph2 = this.executionGraph;
        executionGraph2.getClass();
        registerJobMetrics(jobManagerJobMetricGroup, executionGraph, gauge, deploymentStateTimeMetrics, executionGraph2::registerJobStatusListener, this.executionGraph.getStatusTimestamp(JobStatus.INITIALIZING), this.jobStatusMetricsSettings);
        this.operatorCoordinatorHandler.startAllOperatorCoordinators();
        startSchedulingInternal();
    }

    public static void registerJobMetrics(MetricGroup metricGroup, JobStatusProvider jobStatusProvider, Gauge<Long> gauge, DeploymentStateTimeMetrics deploymentStateTimeMetrics, Consumer<JobStatusListener> consumer, long j, MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) {
        metricGroup.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider));
        metricGroup.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider));
        gauge.getClass();
        metricGroup.gauge(MetricNames.NUM_RESTARTS, gauge::getValue);
        gauge.getClass();
        metricGroup.gauge(MetricNames.FULL_RESTARTS, gauge::getValue);
        JobStatusMetrics jobStatusMetrics = new JobStatusMetrics(j, jobStatusMetricsSettings);
        jobStatusMetrics.registerMetrics(metricGroup);
        consumer.accept(jobStatusMetrics);
        deploymentStateTimeMetrics.registerMetrics(metricGroup);
    }

    protected abstract void startSchedulingInternal();

    public CompletableFuture<Void> closeAsync() {
        this.mainThreadExecutor.assertRunningInMainThread();
        Throwable flinkException = new FlinkException("Scheduler is being stopped.");
        CompletableFuture<Void> thenAcceptAsync = this.executionGraph.getTerminationFuture().thenAcceptAsync(this::shutDownCheckpointServices, (Executor) getMainThreadExecutor());
        CheckpointsCleaner checkpointsCleaner = this.checkpointsCleaner;
        checkpointsCleaner.getClass();
        CompletableFuture<Void> composeAfterwards = FutureUtils.composeAfterwards(thenAcceptAsync, checkpointsCleaner::closeAsync);
        FutureUtils.assertNoException(composeAfterwards);
        incrementVersionsOfAllVertices();
        cancelAllPendingSlotRequestsInternal();
        this.executionGraph.suspend(flinkException);
        this.operatorCoordinatorHandler.disposeAllOperatorCoordinators();
        return composeAfterwards;
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void cancel() {
        this.mainThreadExecutor.assertRunningInMainThread();
        incrementVersionsOfAllVertices();
        cancelAllPendingSlotRequestsInternal();
        this.executionGraph.cancel();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<JobStatus> getJobTerminationFuture() {
        return this.executionGraph.getTerminationFuture();
    }

    protected final void archiveGlobalFailure(Throwable th) {
        archiveGlobalFailure(th, this.executionGraph.getStatusTimestamp(JobStatus.FAILED), (Iterable) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toSet()));
    }

    private void archiveGlobalFailure(Throwable th, long j, Iterable<Execution> iterable) {
        this.exceptionHistory.add(RootExceptionHistoryEntry.fromGlobalFailure(th, j, iterable));
        this.log.debug("Archive global failure.", th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void archiveFromFailureHandlingResult(FailureHandlingResultSnapshot failureHandlingResultSnapshot) {
        if (!failureHandlingResultSnapshot.getRootCauseExecution().isPresent()) {
            archiveGlobalFailure(failureHandlingResultSnapshot.getRootCause(), failureHandlingResultSnapshot.getTimestamp(), failureHandlingResultSnapshot.getConcurrentlyFailedExecution());
            return;
        }
        Execution execution = failureHandlingResultSnapshot.getRootCauseExecution().get();
        RootExceptionHistoryEntry fromFailureHandlingResultSnapshot = RootExceptionHistoryEntry.fromFailureHandlingResultSnapshot(failureHandlingResultSnapshot);
        this.exceptionHistory.add(fromFailureHandlingResultSnapshot);
        this.log.debug("Archive local failure causing attempt {} to fail: {}", execution.getAttemptId(), fromFailureHandlingResultSnapshot.getExceptionAsString());
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public final boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionStateTransition) {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(taskExecutionStateTransition.getID());
        if (execution == null || !this.executionGraph.updateState(taskExecutionStateTransition)) {
            return false;
        }
        onTaskExecutionStateUpdate(execution, taskExecutionStateTransition);
        return true;
    }

    private void onTaskExecutionStateUpdate(Execution execution, TaskExecutionStateTransition taskExecutionStateTransition) {
        if (execution.getState() != taskExecutionStateTransition.getExecutionState()) {
            return;
        }
        switch (taskExecutionStateTransition.getExecutionState()) {
            case FINISHED:
                onTaskFinished(execution);
                return;
            case FAILED:
                onTaskFailed(execution);
                return;
            default:
                return;
        }
    }

    protected abstract void onTaskFinished(Execution execution);

    protected abstract void onTaskFailed(Execution execution);

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public SerializedInputSplit requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraphHandler.requestNextInputSplit(jobVertexID, executionAttemptID);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) throws PartitionProducerDisposedException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraphHandler.requestPartitionState(intermediateDataSetID, resultPartitionID);
    }

    @VisibleForTesting
    public Iterable<RootExceptionHistoryEntry> getExceptionHistory() {
        return this.exceptionHistory.toArrayList();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ExecutionGraphInfo requestJob() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createFrom(this.executionGraph), getExceptionHistory());
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobStatus requestJobStatus() {
        return this.executionGraph.getState();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobDetails requestJobDetails() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return JobDetails.createDetailsForJob(this.executionGraph);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public KvStateLocation requestKvStateLocation(JobID jobID, String str) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.kvStateHandler.requestKvStateLocation(jobID, str);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.kvStateHandler.notifyKvStateRegistered(jobID, jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.kvStateHandler.notifyKvStateUnregistered(jobID, jobVertexID, keyGroupRange, str);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> triggerSavepoint(String str, boolean z, SavepointFormatType savepointFormatType) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, str, getJobId(), this.log);
        this.log.info("Triggering {}savepoint for job {}.", z ? "cancel-with-" : "", this.jobGraph.getJobID());
        if (z) {
            stopCheckpointScheduler();
        }
        return checkpointCoordinator.triggerSavepoint(str, savepointFormatType).thenApply((v0) -> {
            return v0.getExternalPointer();
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (str2, th) -> {
            if (th != null) {
                if (z) {
                    startCheckpointScheduler();
                }
                throw new CompletionException(th);
            }
            if (z) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", str2, this.jobGraph.getJobID());
                cancel();
            }
            return str2;
        }, (Executor) this.mainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> triggerCheckpoint() {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        JobID jobID = this.jobGraph.getJobID();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", jobID));
        }
        this.log.info("Triggering a manual checkpoint for job {}.", jobID);
        return checkpointCoordinator.triggerCheckpoint(false).thenApply((v0) -> {
            return v0.getExternalPointer();
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (str, th) -> {
            if (th != null) {
                throw new CompletionException(th);
            }
            return str;
        }, (Executor) this.mainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointScheduling
    public void stopCheckpointScheduler() {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            this.log.info("Periodic checkpoint scheduling could not be stopped due to the CheckpointCoordinator being shutdown.");
        } else {
            checkpointCoordinator.stopCheckpointScheduler();
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointScheduling
    public void startCheckpointScheduler() {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            this.log.info("Periodic checkpoint scheduling could not be started due to the CheckpointCoordinator being shutdown.");
        } else if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            } catch (IllegalStateException e) {
            }
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        this.executionGraphHandler.acknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        this.executionGraphHandler.declineCheckpoint(declineCheckpoint);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics) {
        this.executionGraphHandler.reportCheckpointMetrics(executionAttemptID, j, checkpointMetrics);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> stopWithSavepoint(@Nullable String str, boolean z, SavepointFormatType savepointFormatType) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(checkpointCoordinator, str, this.executionGraph.getJobID(), this.log);
        this.log.info("Triggering stop-with-savepoint for job {}.", this.jobGraph.getJobID());
        stopCheckpointScheduler();
        CompletableFuture<Collection<ExecutionState>> combinedExecutionTerminationFuture = getCombinedExecutionTerminationFuture();
        return new StopWithSavepointTerminationManager(new StopWithSavepointTerminationHandlerImpl(this.jobGraph.getJobID(), this, this.log)).stopWithSavepoint(checkpointCoordinator.triggerSynchronousSavepoint(z, str, savepointFormatType), combinedExecutionTerminationFuture, this.mainThreadExecutor);
    }

    private CompletableFuture<Collection<ExecutionState>> getCombinedExecutionTerminationFuture() {
        return FutureUtils.combineAll((Collection) StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getTerminalStateFuture();
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID executionAttemptID, OperatorID operatorID, OperatorEvent operatorEvent) throws FlinkException {
        this.operatorCoordinatorHandler.deliverOperatorEventToCoordinator(executionAttemptID, operatorID, operatorEvent);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorID, CoordinationRequest coordinationRequest) throws FlinkException {
        return this.operatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(operatorID, coordinationRequest);
    }

    @VisibleForTesting
    JobID getJobId() {
        return this.jobGraph.getJobID();
    }
}
