/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.ExecutionAttemptMappingProvider;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AllVerticesIterator;
import org.apache.flink.runtime.executiongraph.EdgeManager;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.executiongraph.ExecutionGraphResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultExecutionGraph
implements ExecutionGraph,
InternalExecutionGraphAccessor {
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private final JobInformation jobInformation;
    private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
    private final ScheduledExecutorService futureExecutor;
    private final Executor ioExecutor;
    @Nonnull
    private ComponentMainThreadExecutor jobMasterMainThreadExecutor;
    private boolean isStoppable = true;
    private final Map<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final Map<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final Map<ExecutionAttemptID, Execution> currentExecutions;
    private final List<JobStatusListener> jobStatusListeners;
    private final long[] stateTimestamps;
    private final Time rpcTimeout;
    private final ClassLoader userClassLoader;
    private final KvStateLocationRegistry kvStateLocationRegistry;
    private final BlobWriter blobWriter;
    private int numJobVerticesTotal;
    private final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory;
    private PartitionGroupReleaseStrategy partitionGroupReleaseStrategy;
    private DefaultExecutionTopology executionTopology;
    @Nullable
    private InternalFailuresListener internalTaskFailuresListener;
    private final Counter numberOfRestartsCounter = new SimpleCounter();
    private final TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint;
    private final int maxPriorAttemptsHistoryLength;
    private int numFinishedJobVertices;
    private volatile JobStatus state = JobStatus.CREATED;
    private final CompletableFuture<JobStatus> terminationFuture = new CompletableFuture();
    private Throwable failureCause;
    private ErrorInfo failureInfo;
    private final JobMasterPartitionTracker partitionTracker;
    private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
    @Nullable
    private CompletableFuture<Void> schedulingFuture;
    private final VertexAttemptNumberStore initialAttemptCounts;
    private final VertexParallelismStore parallelismStore;
    @Nullable
    private CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration;
    @Nullable
    private CheckpointCoordinator checkpointCoordinator;
    @Nullable
    private ScheduledExecutorService checkpointCoordinatorTimer;
    private CheckpointStatsTracker checkpointStatsTracker;
    @Nullable
    private String stateBackendName;
    @Nullable
    private String checkpointStorageName;
    private String jsonPlan;
    private final ShuffleMaster<?> shuffleMaster;
    private final ExecutionDeploymentListener executionDeploymentListener;
    private final ExecutionStateUpdateListener executionStateUpdateListener;
    private final EdgeManager edgeManager;
    private final Map<ExecutionVertexID, ExecutionVertex> executionVerticesById;
    private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionsById;

    public DefaultExecutionGraph(JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, Time rpcTimeout, int maxPriorAttemptsHistoryLength, ClassLoader userClassLoader, BlobWriter blobWriter, PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory, ShuffleMaster<?> shuffleMaster, JobMasterPartitionTracker partitionTracker, TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint, ExecutionDeploymentListener executionDeploymentListener, ExecutionStateUpdateListener executionStateUpdateListener, long initializationTimestamp, VertexAttemptNumberStore initialAttemptCounts, VertexParallelismStore vertexParallelismStore) throws IOException {
        this.jobInformation = (JobInformation)Preconditions.checkNotNull((Object)jobInformation);
        this.blobWriter = (BlobWriter)Preconditions.checkNotNull((Object)blobWriter);
        this.partitionLocationConstraint = (TaskDeploymentDescriptorFactory.PartitionLocationConstraint)((Object)Preconditions.checkNotNull((Object)((Object)partitionLocationConstraint)));
        this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
        this.futureExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)futureExecutor);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader, (String)"userClassLoader");
        this.tasks = new HashMap<JobVertexID, ExecutionJobVertex>(16);
        this.intermediateResults = new HashMap<IntermediateDataSetID, IntermediateResult>(16);
        this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>(16);
        this.currentExecutions = new HashMap<ExecutionAttemptID, Execution>(16);
        this.jobStatusListeners = new ArrayList<JobStatusListener>();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        this.partitionGroupReleaseStrategyFactory = (PartitionGroupReleaseStrategy.Factory)Preconditions.checkNotNull((Object)partitionGroupReleaseStrategyFactory);
        this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), this.getAllVertices());
        this.maxPriorAttemptsHistoryLength = maxPriorAttemptsHistoryLength;
        this.schedulingFuture = null;
        this.jobMasterMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("ExecutionGraph is not initialized with proper main thread executor. Call to ExecutionGraph.start(...) required.");
        this.shuffleMaster = (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster);
        this.partitionTracker = (JobMasterPartitionTracker)Preconditions.checkNotNull((Object)partitionTracker);
        this.resultPartitionAvailabilityChecker = new ExecutionGraphResultPartitionAvailabilityChecker(this::createResultPartitionId, partitionTracker);
        this.executionDeploymentListener = executionDeploymentListener;
        this.executionStateUpdateListener = executionStateUpdateListener;
        this.initialAttemptCounts = initialAttemptCounts;
        this.parallelismStore = vertexParallelismStore;
        this.edgeManager = new EdgeManager();
        this.executionVerticesById = new HashMap<ExecutionVertexID, ExecutionVertex>();
        this.resultPartitionsById = new HashMap<IntermediateResultPartitionID, IntermediateResultPartition>();
    }

    @Override
    public void start(@Nonnull ComponentMainThreadExecutor jobMasterMainThreadExecutor) {
        this.jobMasterMainThreadExecutor = jobMasterMainThreadExecutor;
    }

    @Override
    public SchedulingTopology getSchedulingTopology() {
        return this.executionTopology;
    }

    @Override
    public TaskDeploymentDescriptorFactory.PartitionLocationConstraint getPartitionLocationConstraint() {
        return this.partitionLocationConstraint;
    }

    @Override
    @Nonnull
    public ComponentMainThreadExecutor getJobMasterMainThreadExecutor() {
        return this.jobMasterMainThreadExecutor;
    }

    @Override
    public Optional<String> getStateBackendName() {
        return Optional.ofNullable(this.stateBackendName);
    }

    @Override
    public Optional<String> getCheckpointStorageName() {
        return Optional.ofNullable(this.checkpointStorageName);
    }

    @Override
    public void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig, List<MasterTriggerRestoreHook<?>> masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStorage checkpointStorage, CheckpointStatsTracker statsTracker, CheckpointsCleaner checkpointsCleaner) {
        Preconditions.checkState((this.state == JobStatus.CREATED ? 1 : 0) != 0, (Object)"Job must be in CREATED state");
        Preconditions.checkState((this.checkpointCoordinator == null ? 1 : 0) != 0, (Object)"checkpointing already enabled");
        Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators = this.buildOpCoordinatorCheckpointContexts();
        this.checkpointStatsTracker = (CheckpointStatsTracker)Preconditions.checkNotNull((Object)statsTracker, (String)"CheckpointStatsTracker");
        this.checkpointCoordinatorConfiguration = (CheckpointCoordinatorConfiguration)Preconditions.checkNotNull((Object)chkConfig, (String)"CheckpointCoordinatorConfiguration");
        CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), new CheckpointFailureManager.FailJobCallback(){

            @Override
            public void failJob(Throwable cause) {
                DefaultExecutionGraph.this.getJobMasterMainThreadExecutor().execute(() -> DefaultExecutionGraph.this.failGlobal(cause));
            }

            @Override
            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                DefaultExecutionGraph.this.getJobMasterMainThreadExecutor().execute(() -> DefaultExecutionGraph.this.failGlobalIfExecutionIsStillRunning(cause, failingTask));
            }
        });
        Preconditions.checkState((this.checkpointCoordinatorTimer == null ? 1 : 0) != 0);
        this.checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobInformation.getJobId(), chkConfig, operatorCoordinators, checkpointIDCounter, checkpointStore, checkpointStorage, this.ioExecutor, checkpointsCleaner, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(this.checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager, this.createCheckpointPlanCalculator(chkConfig.isEnableCheckpointsAfterTasksFinish()), new ExecutionAttemptMappingProvider(this.getAllExecutionVertices()), this.checkpointStatsTracker);
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (this.checkpointCoordinator.addMasterHook(hook)) continue;
            LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", (Object)hook.getIdentifier());
        }
        if (this.checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            this.registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator());
        }
        this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
        this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
    }

    private CheckpointPlanCalculator createCheckpointPlanCalculator(boolean enableCheckpointsAfterTasksFinish) {
        return new DefaultCheckpointPlanCalculator(this.getJobID(), new ExecutionGraphCheckpointPlanCalculatorContext(this), this.getVerticesTopologically(), enableCheckpointsAfterTasksFinish);
    }

    @Override
    @Nullable
    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    @Override
    public KvStateLocationRegistry getKvStateLocationRegistry() {
        return this.kvStateLocationRegistry;
    }

    @Override
    public CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration() {
        if (this.checkpointCoordinatorConfiguration != null) {
            return this.checkpointCoordinatorConfiguration;
        }
        return null;
    }

    @Override
    public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.createSnapshot();
        }
        return null;
    }

    private Collection<OperatorCoordinatorCheckpointContext> buildOpCoordinatorCheckpointContexts() {
        ArrayList<OperatorCoordinatorCheckpointContext> contexts = new ArrayList<OperatorCoordinatorCheckpointContext>();
        for (ExecutionJobVertex vertex : this.verticesInCreationOrder) {
            contexts.addAll(vertex.getOperatorCoordinators());
        }
        contexts.trimToSize();
        return contexts;
    }

    @Override
    public void setJsonPlan(String jsonPlan) {
        this.jsonPlan = jsonPlan;
    }

    @Override
    public String getJsonPlan() {
        return this.jsonPlan;
    }

    @Override
    public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
        return this.jobInformationOrBlobKey;
    }

    @Override
    public JobID getJobID() {
        return this.jobInformation.getJobId();
    }

    @Override
    public String getJobName() {
        return this.jobInformation.getJobName();
    }

    @Override
    public boolean isStoppable() {
        return this.isStoppable;
    }

    @Override
    public Configuration getJobConfiguration() {
        return this.jobInformation.getJobConfiguration();
    }

    @Override
    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override
    public JobStatus getState() {
        return this.state;
    }

    @Override
    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override
    public ErrorInfo getFailureInfo() {
        return this.failureInfo;
    }

    @Override
    public long getNumberOfRestarts() {
        return this.numberOfRestartsCounter.getCount();
    }

    @Override
    public int getNumFinishedVertices() {
        return IterableUtils.toStream(this.getVerticesTopologically()).map(ExecutionJobVertex::getNumExecutionVertexFinished).mapToInt(Integer::intValue).sum();
    }

    @Override
    public ExecutionJobVertex getJobVertex(JobVertexID id) {
        return this.tasks.get(id);
    }

    @Override
    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    @Override
    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int numElements = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>(){

            @Override
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>(){
                    private int pos = 0;

                    @Override
                    public boolean hasNext() {
                        return this.pos < numElements;
                    }

                    @Override
                    public ExecutionJobVertex next() {
                        if (this.hasNext()) {
                            return (ExecutionJobVertex)DefaultExecutionGraph.this.verticesInCreationOrder.get(this.pos++);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    @Override
    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    @Override
    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return () -> new AllVerticesIterator(this.getVerticesTopologically().iterator());
    }

    @Override
    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    @Override
    public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) {
        return (ExecutionVertex)Preconditions.checkNotNull((Object)this.executionVerticesById.get(id));
    }

    @Override
    public IntermediateResultPartition getResultPartitionOrThrow(IntermediateResultPartitionID id) {
        return (IntermediateResultPartition)Preconditions.checkNotNull((Object)this.resultPartitionsById.get(id));
    }

    @Override
    public long getStatusTimestamp(JobStatus status) {
        return this.stateTimestamps[status.ordinal()];
    }

    @Override
    public final BlobWriter getBlobWriter() {
        return this.blobWriter;
    }

    @Override
    public Executor getFutureExecutor() {
        return this.futureExecutor;
    }

    @Override
    public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.getAllExecutionVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return userAccumulators;
    }

    @Override
    public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorsSerialized() {
        return this.aggregateUserAccumulators().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> DefaultExecutionGraph.serializeAccumulator((String)entry.getKey(), (OptionalFailure)entry.getValue())));
    }

    private static SerializedValue<OptionalFailure<Object>> serializeAccumulator(String name, OptionalFailure<Accumulator<?, ?>> accumulator) {
        try {
            if (accumulator.isFailure()) {
                return new SerializedValue((Object)OptionalFailure.ofFailure((Throwable)accumulator.getFailureCause()));
            }
            return new SerializedValue((Object)OptionalFailure.of((Object)((Accumulator)accumulator.getUnchecked()).getLocalValue()));
        }
        catch (IOException ioe) {
            LOG.error("Could not serialize accumulator " + name + '.', (Throwable)ioe);
            try {
                return new SerializedValue((Object)OptionalFailure.ofFailure((Throwable)ioe));
            }
            catch (IOException e) {
                throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
            }
        }
    }

    @Override
    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        Map<String, OptionalFailure<Accumulator<?, ?>>> accumulatorMap = this.aggregateUserAccumulators();
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
    }

    @Override
    public void setInternalTaskFailuresListener(InternalFailuresListener internalTaskFailuresListener) {
        Preconditions.checkNotNull((Object)internalTaskFailuresListener);
        Preconditions.checkState((this.internalTaskFailuresListener == null ? 1 : 0) != 0, (Object)"internalTaskFailuresListener can be only set once");
        this.internalTaskFailuresListener = internalTaskFailuresListener;
    }

    @Override
    public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {
        this.assertRunningInJobMasterMainThread();
        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.", new Object[]{topologicallySorted.size(), this.tasks.size(), this.intermediateResults.size()});
        long createTimestamp = System.currentTimeMillis();
        for (JobVertex jobVertex : topologicallySorted) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            VertexParallelismInformation parallelismInfo = this.parallelismStore.getParallelismInfo(jobVertex.getID());
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, this.maxPriorAttemptsHistoryLength, this.rpcTimeout, createTimestamp, parallelismInfo, this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet == null) continue;
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet));
            }
            this.verticesInCreationOrder.add(ejv);
            ++this.numJobVerticesTotal;
        }
        this.registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
        this.executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
        this.partitionGroupReleaseStrategy = this.partitionGroupReleaseStrategyFactory.createInstance(this.getSchedulingTopology());
    }

    @Override
    public void transitionToRunning() {
        if (!this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }

    /*
     * Unable to fully structure code
     */
    @Override
    public void cancel() {
        block2: {
            this.assertRunningInJobMasterMainThread();
            do lbl-1000:
            // 3 sources

            {
                block3: {
                    if ((current = this.state) != JobStatus.RUNNING && current != JobStatus.CREATED && current != JobStatus.RESTARTING) break block3;
                    if (!this.transitionState(current, JobStatus.CANCELLING)) ** GOTO lbl-1000
                    this.incrementRestarts();
                    ongoingSchedulingFuture = this.schedulingFuture;
                    if (ongoingSchedulingFuture != null) {
                        ongoingSchedulingFuture.cancel(false);
                    }
                    allTerminal = this.cancelVerticesAsync();
                    allTerminal.whenComplete((BiConsumer<Void, Throwable>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;Ljava/lang/Object;)V, lambda$cancel$2(java.lang.Void java.lang.Throwable ), (Ljava/lang/Void;Ljava/lang/Throwable;)V)((DefaultExecutionGraph)this));
                    return;
                }
                if (current != JobStatus.FAILING) break block2;
            } while (!this.transitionState(current, JobStatus.CANCELLING));
            return;
        }
    }

    @VisibleForTesting
    protected FutureUtils.ConjunctFuture<Void> cancelVerticesAsync() {
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(this.verticesInCreationOrder.size());
        for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
            futures.add(ejv.cancelWithFuture());
        }
        return FutureUtils.waitForAll(futures);
    }

    @Override
    public void suspend(Throwable suspensionCause) {
        ArrayList<CompletableFuture<Void>> executionJobVertexTerminationFutures;
        this.assertRunningInJobMasterMainThread();
        if (this.state.isTerminalState()) {
            return;
        }
        if (this.transitionState(this.state, JobStatus.SUSPENDED, suspensionCause)) {
            this.initFailureCause(suspensionCause, System.currentTimeMillis());
            this.incrementRestarts();
            if (this.schedulingFuture != null) {
                this.schedulingFuture.cancel(false);
            }
            executionJobVertexTerminationFutures = new ArrayList<CompletableFuture<Void>>(this.verticesInCreationOrder.size());
            for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                executionJobVertexTerminationFutures.add(ejv.suspend());
            }
        } else {
            throw new IllegalStateException(String.format("Could not suspend because transition from %s to %s failed.", this.state, JobStatus.SUSPENDED));
        }
        FutureUtils.ConjunctFuture jobVerticesTerminationFuture = FutureUtils.waitForAll(executionJobVertexTerminationFutures);
        Preconditions.checkState((boolean)jobVerticesTerminationFuture.isDone(), (Object)"Suspend needs to happen atomically");
        jobVerticesTerminationFuture.whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                LOG.debug("Could not properly suspend the execution graph.", throwable);
            }
            this.onTerminalState(this.state);
            LOG.info("Job {} has been suspended.", (Object)this.getJobID());
        });
    }

    void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID failingAttempt) {
        Execution failedExecution = this.currentExecutions.get(failingAttempt);
        if (failedExecution != null && (failedExecution.getState() == ExecutionState.RUNNING || failedExecution.getState() == ExecutionState.INITIALIZING)) {
            this.failGlobal(cause);
        } else {
            LOG.debug("The failing attempt {} belongs to an already not running task thus won't fail the job", (Object)failingAttempt);
        }
    }

    @Override
    public void failGlobal(Throwable t) {
        Preconditions.checkState((this.internalTaskFailuresListener != null ? 1 : 0) != 0);
        this.internalTaskFailuresListener.notifyGlobalFailure(t);
    }

    @Override
    public ArchivedExecutionConfig getArchivedExecutionConfig() {
        try {
            ExecutionConfig executionConfig = (ExecutionConfig)this.jobInformation.getSerializedExecutionConfig().deserializeValue(this.userClassLoader);
            if (executionConfig != null) {
                return executionConfig.archive();
            }
        }
        catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", (Object)this.getJobID(), (Object)e);
        }
        return null;
    }

    @Override
    public CompletableFuture<JobStatus> getTerminationFuture() {
        return this.terminationFuture;
    }

    @Override
    @VisibleForTesting
    public JobStatus waitUntilTerminal() throws InterruptedException {
        try {
            return this.terminationFuture.get();
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean transitionState(JobStatus current, JobStatus newState) {
        return this.transitionState(current, newState, null);
    }

    private void transitionState(JobStatus newState, Throwable error) {
        this.transitionState(this.state, newState, error);
    }

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        this.assertRunningInJobMasterMainThread();
        if (current.isTerminalState()) {
            String message = "Job is trying to leave terminal state " + current;
            LOG.error(message);
            throw new IllegalStateException(message);
        }
        if (this.state == current) {
            this.state = newState;
            LOG.info("Job {} ({}) switched from state {} to {}.", new Object[]{this.getJobName(), this.getJobID(), current, newState, error});
            this.stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
            this.notifyJobStatusChange(newState);
            return true;
        }
        return false;
    }

    @Override
    public void incrementRestarts() {
        this.numberOfRestartsCounter.inc();
    }

    @Override
    public void initFailureCause(Throwable t, long timestamp) {
        this.failureCause = t;
        this.failureInfo = new ErrorInfo(t, timestamp);
    }

    @Override
    public void jobVertexFinished() {
        this.assertRunningInJobMasterMainThread();
        int numFinished = ++this.numFinishedJobVertices;
        if (numFinished == this.numJobVerticesTotal && this.state == JobStatus.RUNNING) {
            try {
                for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                    ejv.getJobVertex().finalizeOnMaster(this.getUserClassLoader());
                }
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalError((Throwable)t);
                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(t);
                this.failGlobal(new Exception("Failed to finalize execution on master", t));
                return;
            }
            if (this.transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
                this.onTerminalState(JobStatus.FINISHED);
            }
        }
    }

    @Override
    public void jobVertexUnFinished() {
        this.assertRunningInJobMasterMainThread();
        --this.numFinishedJobVertices;
    }

    private void allVerticesInTerminalState() {
        block5: {
            JobStatus current;
            block4: {
                this.assertRunningInJobMasterMainThread();
                while (true) {
                    if ((current = this.state) == JobStatus.RUNNING) {
                        this.failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
                        continue;
                    }
                    if (current != JobStatus.CANCELLING) break block4;
                    if (this.transitionState(current, JobStatus.CANCELED)) break;
                }
                this.onTerminalState(JobStatus.CANCELED);
                break block5;
            }
            if (current == JobStatus.FAILING) break block5;
            if (current.isGloballyTerminalState()) {
                LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
            } else {
                this.failGlobal(new Exception("ExecutionGraph went into final state from state " + current));
            }
        }
    }

    @Override
    public void failJob(Throwable cause, long timestamp) {
        if (this.state == JobStatus.FAILING || this.state.isTerminalState()) {
            return;
        }
        this.transitionState(JobStatus.FAILING, cause);
        this.initFailureCause(cause, timestamp);
        FutureUtils.assertNoException((CompletableFuture)this.cancelVerticesAsync().whenComplete((aVoid, throwable) -> {
            if (this.transitionState(JobStatus.FAILING, JobStatus.FAILED, cause)) {
                this.onTerminalState(JobStatus.FAILED);
            } else if (this.state == JobStatus.CANCELLING) {
                this.transitionState(JobStatus.CANCELLING, JobStatus.CANCELED);
                this.onTerminalState(JobStatus.CANCELED);
            } else if (!this.state.isTerminalState()) {
                throw new IllegalStateException("Cannot complete job failing from an unexpected state: " + this.state);
            }
        }));
    }

    private void onTerminalState(JobStatus status) {
        LOG.debug("ExecutionGraph {} reached terminal state {}.", (Object)this.getJobID(), (Object)status);
        try {
            CheckpointCoordinator coord = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (coord != null) {
                coord.shutdown();
            }
            if (this.checkpointCoordinatorTimer != null) {
                this.checkpointCoordinatorTimer.shutdownNow();
                this.checkpointCoordinatorTimer = null;
            }
        }
        catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable)e);
        }
        finally {
            this.terminationFuture.complete(status);
        }
    }

    @Override
    public boolean updateState(TaskExecutionStateTransition state) {
        this.assertRunningInJobMasterMainThread();
        Execution attempt = this.currentExecutions.get(state.getID());
        if (attempt != null) {
            try {
                boolean stateUpdated = this.updateStateInternal(state, attempt);
                this.maybeReleasePartitionGroupsFor(attempt);
                return stateUpdated;
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                this.failGlobal(t);
                return false;
            }
        }
        return false;
    }

    private boolean updateStateInternal(TaskExecutionStateTransition state, Execution attempt) {
        switch (state.getExecutionState()) {
            case INITIALIZING: {
                return attempt.switchToRecovering();
            }
            case RUNNING: {
                return attempt.switchToRunning();
            }
            case FINISHED: {
                Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                attempt.markFinished(accumulators, state.getIOMetrics());
                return true;
            }
            case CANCELED: {
                Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                attempt.completeCancelling(accumulators, state.getIOMetrics(), false);
                return true;
            }
            case FAILED: {
                Map<String, Accumulator<?, ?>> accumulators = this.deserializeAccumulators(state);
                attempt.markFailed(state.getError(this.userClassLoader), state.getCancelTask(), accumulators, state.getIOMetrics(), state.getReleasePartitions(), true);
                return true;
            }
        }
        attempt.fail(new Exception("TaskManager sent illegal state update: " + (Object)((Object)state.getExecutionState())));
        return false;
    }

    private void maybeReleasePartitionGroupsFor(Execution attempt) {
        ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();
        if (attempt.getState() == ExecutionState.FINISHED) {
            List<ConsumedPartitionGroup> releasablePartitionGroups = this.partitionGroupReleaseStrategy.vertexFinished(finishedExecutionVertex);
            this.releasePartitionGroups(releasablePartitionGroups);
        } else {
            this.partitionGroupReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
        }
    }

    private void releasePartitionGroups(List<ConsumedPartitionGroup> releasablePartitionGroups) {
        if (releasablePartitionGroups.size() > 0) {
            for (ConsumedPartitionGroup releasablePartitionGroup : releasablePartitionGroups) {
                IntermediateResult totalResult = (IntermediateResult)Preconditions.checkNotNull((Object)this.intermediateResults.get(releasablePartitionGroup.getIntermediateDataSetID()));
                totalResult.clearCachedInformationForPartitionGroup(releasablePartitionGroup);
            }
            List<ResultPartitionID> releasablePartitionIds = releasablePartitionGroups.stream().flatMap(IterableUtils::toStream).map(this::createResultPartitionId).collect(Collectors.toList());
            this.partitionTracker.stopTrackingAndReleasePartitions(releasablePartitionIds);
        }
    }

    ResultPartitionID createResultPartitionId(IntermediateResultPartitionID resultPartitionId) {
        SchedulingResultPartition schedulingResultPartition = this.getSchedulingTopology().getResultPartition(resultPartitionId);
        SchedulingExecutionVertex producer = (SchedulingExecutionVertex)schedulingResultPartition.getProducer();
        ExecutionVertexID producerId = (ExecutionVertexID)producer.getId();
        JobVertexID jobVertexId = producerId.getJobVertexId();
        ExecutionJobVertex jobVertex = this.getJobVertex(jobVertexId);
        Preconditions.checkNotNull((Object)jobVertex, (String)"Unknown job vertex %s", (Object[])new Object[]{jobVertexId});
        ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
        int subtaskIndex = producerId.getSubtaskIndex();
        Preconditions.checkState((subtaskIndex < taskVertices.length ? 1 : 0) != 0, (String)"Invalid subtask index %d for job vertex %s", (Object[])new Object[]{subtaskIndex, jobVertexId});
        ExecutionVertex taskVertex = taskVertices[subtaskIndex];
        Execution execution = taskVertex.getCurrentExecutionAttempt();
        return new ResultPartitionID(resultPartitionId, execution.getAttemptId());
    }

    private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionStateTransition state) {
        AccumulatorSnapshot serializedAccumulators = state.getAccumulators();
        if (serializedAccumulators != null) {
            try {
                return serializedAccumulators.deserializeUserAccumulators(this.userClassLoader);
            }
            catch (Throwable t) {
                LOG.error("Failed to deserialize final accumulator results.", t);
            }
        }
        return null;
    }

    @Override
    public void notifyPartitionDataAvailable(ResultPartitionID partitionId) {
        this.assertRunningInJobMasterMainThread();
        Execution execution = this.currentExecutions.get(partitionId.getProducerId());
        Preconditions.checkState((execution != null ? 1 : 0) != 0, (Object)("Cannot find execution for execution Id " + partitionId.getPartitionId() + "."));
        execution.getVertex().notifyPartitionDataAvailable(partitionId);
    }

    @Override
    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    @Override
    public void registerExecution(Execution exec) {
        this.assertRunningInJobMasterMainThread();
        Execution previous = this.currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
        if (previous != null) {
            this.failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
        }
    }

    @Override
    public void deregisterExecution(Execution exec) {
        this.assertRunningInJobMasterMainThread();
        Execution contained = this.currentExecutions.remove(exec.getAttemptId());
        if (contained != null && contained != exec) {
            this.failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
        }
    }

    private void registerExecutionVerticesAndResultPartitions(List<ExecutionJobVertex> executionJobVertices) {
        for (ExecutionJobVertex executionJobVertex : executionJobVertices) {
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                this.executionVerticesById.put(executionVertex.getID(), executionVertex);
                this.resultPartitionsById.putAll(executionVertex.getProducedPartitions());
            }
        }
    }

    @Override
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<String, Accumulator<?, ?>> userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get(execID);
            if (execution != null) {
                execution.setAccumulators(userAccumulators);
            } else {
                LOG.debug("Received accumulator result for unknown execution {}.", (Object)execID);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", (Object)this.getJobID(), (Object)e);
        }
    }

    @Override
    public void registerJobStatusListener(JobStatusListener listener) {
        if (listener != null) {
            this.jobStatusListeners.add(listener);
        }
    }

    private void notifyJobStatusChange(JobStatus newState) {
        if (this.jobStatusListeners.size() > 0) {
            long timestamp = System.currentTimeMillis();
            for (JobStatusListener listener : this.jobStatusListeners) {
                try {
                    listener.jobStatusChanges(this.getJobID(), newState, timestamp);
                }
                catch (Throwable t) {
                    LOG.warn("Error while notifying JobStatusListener", t);
                }
            }
        }
    }

    @Override
    public void notifyExecutionChange(Execution execution, ExecutionState newExecutionState) {
        this.executionStateUpdateListener.onStateUpdate(execution.getAttemptId(), newExecutionState);
    }

    private void assertRunningInJobMasterMainThread() {
        if (!(this.jobMasterMainThreadExecutor instanceof ComponentMainThreadExecutor.DummyComponentMainThreadExecutor)) {
            this.jobMasterMainThreadExecutor.assertRunningInMainThread();
        }
    }

    @Override
    public void notifySchedulerNgAboutInternalTaskFailure(ExecutionAttemptID attemptId, Throwable t, boolean cancelTask, boolean releasePartitions) {
        Preconditions.checkState((this.internalTaskFailuresListener != null ? 1 : 0) != 0);
        this.internalTaskFailuresListener.notifyTaskFailure(attemptId, t, cancelTask, releasePartitions);
    }

    @Override
    public void deleteBlobs(List<PermanentBlobKey> blobKeys) {
        CompletableFuture.runAsync(() -> {
            for (PermanentBlobKey blobKey : blobKeys) {
                this.blobWriter.deletePermanent(this.getJobID(), blobKey);
            }
        }, this.ioExecutor);
    }

    @Override
    public ShuffleMaster<?> getShuffleMaster() {
        return this.shuffleMaster;
    }

    @Override
    public JobMasterPartitionTracker getPartitionTracker() {
        return this.partitionTracker;
    }

    @Override
    public ResultPartitionAvailabilityChecker getResultPartitionAvailabilityChecker() {
        return this.resultPartitionAvailabilityChecker;
    }

    @Override
    public PartitionGroupReleaseStrategy getPartitionGroupReleaseStrategy() {
        return this.partitionGroupReleaseStrategy;
    }

    @Override
    public ExecutionDeploymentListener getExecutionDeploymentListener() {
        return this.executionDeploymentListener;
    }

    private /* synthetic */ void lambda$cancel$2(Void value, Throwable throwable) {
        if (throwable != null) {
            this.transitionState(JobStatus.CANCELLING, JobStatus.FAILED, new FlinkException("Could not cancel job " + this.getJobName() + " because not all execution job vertices could be cancelled.", throwable));
        } else {
            this.allVerticesInTerminalState();
        }
    }
}

