/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AllVerticesIterator;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionStatusListener;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionGraph
implements AccessExecutionGraph,
Archiveable<ArchivedExecutionGraph> {
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    static final String RESTARTING_TIME_METRIC_NAME = "restartingTime";
    private final SerializableObject progressLock = new SerializableObject();
    private final JobInformation jobInformation;
    private final SerializedValue<JobInformation> serializedJobInformation;
    private final Executor futureExecutor;
    private final Executor ioExecutor;
    private boolean isStoppable = true;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<JobStatusListener> jobStatusListeners;
    private final List<ExecutionStatusListener> executionListeners;
    private final long[] stateTimestamps;
    private final Time timeout;
    private final RestartStrategy restartStrategy;
    private final SlotProvider slotProvider;
    private final ClassLoader userClassLoader;
    private final KvStateLocationRegistry kvStateLocationRegistry;
    private boolean allowQueuedScheduling = false;
    private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
    private volatile JobStatus state = JobStatus.CREATED;
    private volatile Throwable failureCause;
    private volatile int numFinishedJobVertices;
    private CheckpointCoordinator checkpointCoordinator;
    private CheckpointStatsTracker checkpointStatsTracker;
    private String jsonPlan;

    ExecutionGraph(Executor futureExecutor, Executor ioExecutor, JobID jobId, String jobName, Configuration jobConfig, SerializedValue<ExecutionConfig> serializedConfig, Time timeout, RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
        this(futureExecutor, ioExecutor, jobId, jobName, jobConfig, serializedConfig, timeout, restartStrategy, new ArrayList<BlobKey>(), new ArrayList<URL>(), slotProvider, ExecutionGraph.class.getClassLoader(), (MetricGroup)new UnregisteredMetricsGroup());
    }

    public ExecutionGraph(Executor futureExecutor, Executor ioExecutor, JobID jobId, String jobName, Configuration jobConfig, SerializedValue<ExecutionConfig> serializedConfig, Time timeout, RestartStrategy restartStrategy, List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, SlotProvider slotProvider, ClassLoader userClassLoader, MetricGroup metricGroup) throws IOException {
        Preconditions.checkNotNull((Object)futureExecutor);
        Preconditions.checkNotNull((Object)jobId);
        Preconditions.checkNotNull((Object)jobName);
        Preconditions.checkNotNull((Object)jobConfig);
        this.jobInformation = new JobInformation(jobId, jobName, serializedConfig, jobConfig, requiredJarFiles, requiredClasspaths);
        this.serializedJobInformation = new SerializedValue((Object)this.jobInformation);
        this.futureExecutor = (Executor)Preconditions.checkNotNull((Object)futureExecutor);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.slotProvider = (SlotProvider)Preconditions.checkNotNull((Object)slotProvider, (String)"scheduler");
        this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader, (String)"userClassLoader");
        this.tasks = new ConcurrentHashMap(16);
        this.intermediateResults = new ConcurrentHashMap(16);
        this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>(16);
        this.currentExecutions = new ConcurrentHashMap(16);
        this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
        this.executionListeners = new CopyOnWriteArrayList<ExecutionStatusListener>();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.timeout = timeout;
        this.restartStrategy = restartStrategy;
        metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, (Gauge)new RestartTimeGauge());
        this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, this.getAllVertices());
    }

    public int getNumberOfExecutionJobVertices() {
        return this.verticesInCreationOrder.size();
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean allowed) {
        this.allowQueuedScheduling = allowed;
    }

    public void setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
    }

    public ScheduleMode getScheduleMode() {
        return this.scheduleMode;
    }

    @Override
    public boolean isArchived() {
        return false;
    }

    public void enableSnapshotCheckpointing(long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, ExternalizedCheckpointSettings externalizeSettings, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, String checkpointDir, CheckpointStatsTracker statsTracker) {
        if (interval < 10L || checkpointTimeout < 10L) {
            throw new IllegalArgumentException();
        }
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        ExecutionVertex[] tasksToTrigger = this.collectExecutionVertices(verticesToTrigger);
        ExecutionVertex[] tasksToWaitFor = this.collectExecutionVertices(verticesToWaitFor);
        ExecutionVertex[] tasksToCommitTo = this.collectExecutionVertices(verticesToCommitTo);
        try {
            this.disableSnaphotCheckpointing();
        }
        catch (Throwable t) {
            LOG.error("Error while shutting down checkpointer.");
        }
        this.checkpointStatsTracker = (CheckpointStatsTracker)Preconditions.checkNotNull((Object)statsTracker, (String)"CheckpointStatsTracker");
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobInformation.getJobId(), interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointDir, this.ioExecutor);
        this.checkpointCoordinator.setCheckpointStatsTracker(this.checkpointStatsTracker);
        if (interval != Long.MAX_VALUE) {
            this.registerJobStatusListener(this.checkpointCoordinator.createActivatorDeactivator());
        }
    }

    public void disableSnaphotCheckpointing() throws Exception {
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        if (this.checkpointCoordinator != null) {
            this.checkpointCoordinator.shutdown(this.state);
            this.checkpointCoordinator = null;
            this.checkpointStatsTracker = null;
        }
    }

    @Override
    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    public KvStateLocationRegistry getKvStateLocationRegistry() {
        return this.kvStateLocationRegistry;
    }

    public RestartStrategy getRestartStrategy() {
        return this.restartStrategy;
    }

    @Override
    public JobSnapshottingSettings getJobSnapshottingSettings() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.getSnapshottingSettings();
        }
        return null;
    }

    @Override
    public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
        if (this.checkpointStatsTracker != null) {
            return this.checkpointStatsTracker.createSnapshot();
        }
        return null;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
        if (jobVertices.size() == 1) {
            ExecutionJobVertex jv = jobVertices.get(0);
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return jv.getTaskVertices();
        }
        ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
        for (ExecutionJobVertex jv : jobVertices) {
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            all.addAll(Arrays.asList(jv.getTaskVertices()));
        }
        return all.toArray(new ExecutionVertex[all.size()]);
    }

    public Collection<BlobKey> getRequiredJarFiles() {
        return this.jobInformation.getRequiredJarFileBlobKeys();
    }

    public Collection<URL> getRequiredClasspaths() {
        return this.jobInformation.getRequiredClasspathURLs();
    }

    public void setJsonPlan(String jsonPlan) {
        this.jsonPlan = jsonPlan;
    }

    @Override
    public String getJsonPlan() {
        return this.jsonPlan;
    }

    public SlotProvider getSlotProvider() {
        return this.slotProvider;
    }

    public SerializedValue<JobInformation> getSerializedJobInformation() {
        return this.serializedJobInformation;
    }

    @Override
    public JobID getJobID() {
        return this.jobInformation.getJobId();
    }

    @Override
    public String getJobName() {
        return this.jobInformation.getJobName();
    }

    @Override
    public boolean isStoppable() {
        return this.isStoppable;
    }

    public Configuration getJobConfiguration() {
        return this.jobInformation.getJobConfiguration();
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override
    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    @Override
    public String getFailureCauseAsString() {
        return ExceptionUtils.stringifyException((Throwable)this.failureCause);
    }

    @Override
    public ExecutionJobVertex getJobVertex(JobVertexID id) {
        return this.tasks.get((Object)id);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int numElements = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>(){

            @Override
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>(){
                    private int pos = 0;

                    @Override
                    public boolean hasNext() {
                        return this.pos < numElements;
                    }

                    @Override
                    public ExecutionJobVertex next() {
                        if (this.hasNext()) {
                            return (ExecutionJobVertex)ExecutionGraph.this.verticesInCreationOrder.get(this.pos++);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>(){

            @Override
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    @Override
    public long getStatusTimestamp(JobStatus status) {
        return this.stateTimestamps[status.ordinal()];
    }

    public Executor getFutureExecutor() {
        return this.futureExecutor;
    }

    public Map<String, Accumulator<?, ?>> aggregateUserAccumulators() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.getAllExecutionVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return userAccumulators;
    }

    @Override
    public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
        Map<String, Accumulator<?, ?>> accumulatorMap = this.aggregateUserAccumulators();
        HashMap<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
        for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
            result.put(entry.getKey(), (SerializedValue<Object>)new SerializedValue((Object)entry.getValue().getLocalValue()));
        }
        return result;
    }

    @Override
    public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
        Map<String, Accumulator<?, ?>> accumulatorMap = this.aggregateUserAccumulators();
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
    }

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", topologiallySorted.size(), this.tasks.size(), this.intermediateResults.size()));
        }
        long createTimestamp = System.currentTimeMillis();
        for (JobVertex jobVertex : topologiallySorted) {
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, this.timeout, createTimestamp);
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", new Object[]{jobVertex.getID(), ejv, previousTask}));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet == null) continue;
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", new Object[]{res.getId(), res, previousDataSet}));
            }
            this.verticesInCreationOrder.add(ejv);
        }
    }

    public void scheduleForExecution() throws JobException {
        block7: {
            block6: {
                if (!this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) break block6;
                switch (this.scheduleMode) {
                    case LAZY_FROM_SOURCES: {
                        for (ExecutionJobVertex ejv : this.tasks.values()) {
                            if (!ejv.getJobVertex().isInputVertex()) continue;
                            ejv.scheduleAll(this.slotProvider, this.allowQueuedScheduling);
                        }
                        break block7;
                    }
                    case EAGER: {
                        for (ExecutionJobVertex ejv : this.getVerticesTopologically()) {
                            ejv.scheduleAll(this.slotProvider, this.allowQueuedScheduling);
                        }
                        break block7;
                    }
                    default: {
                        throw new JobException("Schedule mode is invalid.");
                    }
                }
            }
            throw new IllegalStateException("Job may only be scheduled from state " + (Object)((Object)JobStatus.CREATED));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        while (true) {
            JobStatus current;
            if ((current = this.state) == JobStatus.RUNNING || current == JobStatus.CREATED) {
                if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
                for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                    ejv.cancel();
                }
                return;
            }
            if (current == JobStatus.FAILING) {
                if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
                return;
            }
            if (current != JobStatus.RESTARTING) break;
            SerializableObject serializableObject = this.progressLock;
            synchronized (serializableObject) {
                if (this.transitionState(current, JobStatus.CANCELED)) {
                    this.postRunCleanup();
                    this.progressLock.notifyAll();
                    LOG.info("Canceled during restart.");
                    return;
                }
            }
        }
    }

    public void stop() throws StoppingException {
        if (this.isStoppable) {
            for (ExecutionVertex ev : this.getAllExecutionVertices()) {
                if (ev.getNumberOfInputs() != 0) continue;
                ev.stop();
            }
        } else {
            throw new StoppingException("This job is not stoppable.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void suspend(Throwable suspensionCause) {
        JobStatus currentState;
        do {
            if (!(currentState = this.state).isGloballyTerminalState()) continue;
            return;
        } while (!this.transitionState(currentState, JobStatus.SUSPENDED, suspensionCause));
        this.failureCause = suspensionCause;
        for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
            ejv.cancel();
        }
        SerializableObject serializableObject = this.progressLock;
        synchronized (serializableObject) {
            this.postRunCleanup();
            this.progressLock.notifyAll();
            LOG.info("Job {} has been suspended.", (Object)this.getJobID());
        }
    }

    public void fail(Throwable t) {
        while (true) {
            JobStatus current;
            if ((current = this.state) == JobStatus.FAILING || current == JobStatus.SUSPENDED || current.isGloballyTerminalState()) {
                return;
            }
            if (current == JobStatus.RESTARTING) {
                this.failureCause = t;
                if (!this.tryRestartOrFail()) continue;
                return;
            }
            if (this.transitionState(current, JobStatus.FAILING, t)) break;
        }
        this.failureCause = t;
        if (!this.verticesInCreationOrder.isEmpty()) {
            for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                ejv.cancel();
            }
        } else {
            this.transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restart() {
        try {
            SerializableObject serializableObject = this.progressLock;
            synchronized (serializableObject) {
                JobStatus current = this.state;
                if (current == JobStatus.CANCELED) {
                    LOG.info("Canceled job during restart. Aborting restart.");
                    return;
                }
                if (current == JobStatus.FAILED) {
                    LOG.info("Failed job during restart. Aborting restart.");
                    return;
                }
                if (current == JobStatus.SUSPENDED) {
                    LOG.info("Suspended job during restart. Aborting restart.");
                    return;
                }
                if (current != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                if (this.slotProvider == null) {
                    throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
                }
                this.currentExecutions.clear();
                HashSet<CoLocationGroup> colGroups = new HashSet<CoLocationGroup>();
                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
                    CoLocationGroup cgroup = jv.getCoLocationGroup();
                    if (cgroup != null && !colGroups.contains(cgroup)) {
                        cgroup.resetConstraints();
                        colGroups.add(cgroup);
                    }
                    jv.resetForNewExecution();
                }
                for (int i = 0; i < this.stateTimestamps.length; ++i) {
                    if (i == JobStatus.RESTARTING.ordinal()) continue;
                    this.stateTimestamps[i] = 0L;
                }
                this.numFinishedJobVertices = 0;
                this.transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null) {
                    this.checkpointCoordinator.restoreLatestCheckpointedState(this.getAllVertices(), false, false);
                }
            }
            this.scheduleForExecution();
        }
        catch (Throwable t) {
            LOG.warn("Failed to restart the job.", t);
            this.fail(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
        SerializableObject serializableObject = this.progressLock;
        synchronized (serializableObject) {
            if (this.checkpointCoordinator != null) {
                this.checkpointCoordinator.restoreLatestCheckpointedState(this.getAllVertices(), errorIfNoCheckpoint, allowNonRestoredState);
            }
        }
    }

    @Override
    public ArchivedExecutionConfig getArchivedExecutionConfig() {
        try {
            ExecutionConfig executionConfig = (ExecutionConfig)this.jobInformation.getSerializedExecutionConfig().deserializeValue(this.userClassLoader);
            if (executionConfig != null) {
                return executionConfig.archive();
            }
        }
        catch (IOException | ClassNotFoundException e) {
            LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", (Object)this.getJobID(), (Object)e);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitUntilFinished() throws InterruptedException {
        SerializableObject serializableObject = this.progressLock;
        synchronized (serializableObject) {
            while (!this.state.isGloballyTerminalState()) {
                this.progressLock.wait();
            }
        }
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        return this.transitionState(current, newState, null);
    }

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            LOG.info("Job {} ({}) switched from state {} to {}.", new Object[]{this.getJobName(), this.getJobID(), current, newState, error});
            this.stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
            this.notifyJobStatusChange(newState, error);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void jobVertexInFinalState() {
        block12: {
            SerializableObject serializableObject = this.progressLock;
            synchronized (serializableObject) {
                block10: {
                    JobStatus current;
                    block11: {
                        if (this.numFinishedJobVertices >= this.verticesInCreationOrder.size()) {
                            throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
                        }
                        ++this.numFinishedJobVertices;
                        if (this.numFinishedJobVertices != this.verticesInCreationOrder.size()) break block12;
                        while (true) {
                            if ((current = this.state) == JobStatus.RUNNING) {
                                if (!this.transitionState(current, JobStatus.FINISHED)) continue;
                                this.postRunCleanup();
                                break block10;
                            }
                            if (current == JobStatus.CANCELLING) {
                                if (!this.transitionState(current, JobStatus.CANCELED)) continue;
                                this.postRunCleanup();
                                break block10;
                            }
                            if (current != JobStatus.FAILING) break block11;
                            if (this.tryRestartOrFail()) break;
                        }
                        break block10;
                    }
                    if (current != JobStatus.SUSPENDED) {
                        if (current.isGloballyTerminalState()) {
                            LOG.warn("Job has entered globally terminal state without waiting for all job vertices to reach final state.");
                        } else {
                            this.fail(new Exception("ExecutionGraph went into final state from state " + (Object)((Object)current)));
                        }
                    }
                }
                this.progressLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean tryRestartOrFail() {
        JobStatus currentState = this.state;
        if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
            SerializableObject serializableObject = this.progressLock;
            synchronized (serializableObject) {
                boolean isRestartable;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", new Object[]{this.getJobName(), this.getJobID(), this.failureCause});
                } else {
                    LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", (Object)this.getJobName(), (Object)this.getJobID());
                }
                boolean isFailureCauseAllowingRestart = !(this.failureCause instanceof SuppressRestartsException);
                boolean isRestartStrategyAllowingRestart = this.restartStrategy.canRestart();
                boolean bl = isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;
                if (isRestartable && this.transitionState(currentState, JobStatus.RESTARTING)) {
                    LOG.info("Restarting the job {} ({}).", (Object)this.getJobName(), (Object)this.getJobID());
                    this.restartStrategy.restart(this);
                    return true;
                }
                if (!isRestartable && this.transitionState(currentState, JobStatus.FAILED, this.failureCause)) {
                    ArrayList<String> reasonsForNoRestart = new ArrayList<String>(2);
                    if (!isFailureCauseAllowingRestart) {
                        reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
                    }
                    if (!isRestartStrategyAllowingRestart) {
                        reasonsForNoRestart.add("the restart strategy prevented it");
                    }
                    LOG.info("Could not restart the job {} ({}) because {}.", new Object[]{this.getJobName(), this.getJobID(), StringUtils.join(reasonsForNoRestart, (String)" and "), this.failureCause});
                    this.postRunCleanup();
                    return true;
                }
                return false;
            }
        }
        return false;
    }

    private void postRunCleanup() {
        try {
            CheckpointCoordinator coord = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (coord != null) {
                coord.shutdown(this.state);
            }
        }
        catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable)e);
        }
    }

    public boolean updateState(TaskExecutionState state) {
        Execution attempt = this.currentExecutions.get((Object)state.getID());
        if (attempt != null) {
            switch (state.getExecutionState()) {
                case RUNNING: {
                    return attempt.switchToRunning();
                }
                case FINISHED: {
                    try {
                        AccumulatorSnapshot accumulators = state.getAccumulators();
                        Map<String, Accumulator<?, ?>> userAccumulators = accumulators.deserializeUserAccumulators(this.userClassLoader);
                        attempt.markFinished(userAccumulators, state.getIOMetrics());
                    }
                    catch (Exception e) {
                        LOG.error("Failed to deserialize final accumulator results.", (Throwable)e);
                        attempt.markFailed(e);
                    }
                    return true;
                }
                case CANCELED: {
                    attempt.cancelingComplete();
                    return true;
                }
                case FAILED: {
                    attempt.markFailed(state.getError(this.userClassLoader));
                    return true;
                }
            }
            attempt.fail(new Exception("TaskManager sent illegal state update: " + (Object)((Object)state.getExecutionState())));
            return false;
        }
        return false;
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) throws ExecutionGraphException {
        Execution execution = this.currentExecutions.get((Object)partitionId.getProducerId());
        if (execution == null) {
            throw new ExecutionGraphException("Cannot find execution for execution Id " + (Object)((Object)partitionId.getPartitionId()) + '.');
        }
        if (execution.getVertex() == null) {
            throw new ExecutionGraphException("Execution with execution Id " + (Object)((Object)partitionId.getPartitionId()) + " has no vertex assigned.");
        }
        execution.getVertex().scheduleOrUpdateConsumers(partitionId);
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    void registerExecution(Execution exec) {
        Execution previous = this.currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
        if (previous != null) {
            this.fail(new Exception("Trying to register execution " + exec + " for already used ID " + (Object)((Object)exec.getAttemptId())));
        }
    }

    void deregisterExecution(Execution exec) {
        Execution contained = this.currentExecutions.remove((Object)exec.getAttemptId());
        if (contained != null && contained != exec) {
            this.fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
        }
    }

    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        try {
            Map<String, Accumulator<?, ?>> userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(this.userClassLoader);
            ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = this.currentExecutions.get((Object)execID);
            if (execution != null) {
                execution.setAccumulators(userAccumulators);
            } else {
                LOG.debug("Received accumulator result for unknown execution {}.", (Object)execID);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot update accumulators for job {}.", (Object)this.getJobID(), (Object)e);
        }
    }

    public void registerJobStatusListener(JobStatusListener listener) {
        if (listener != null) {
            this.jobStatusListeners.add(listener);
        }
    }

    public void registerExecutionListener(ExecutionStatusListener listener) {
        if (listener != null) {
            this.executionListeners.add(listener);
        }
    }

    private void notifyJobStatusChange(JobStatus newState, Throwable error) {
        if (this.jobStatusListeners.size() > 0) {
            long timestamp = System.currentTimeMillis();
            SerializedThrowable serializedError = error == null ? null : new SerializedThrowable(error);
            for (JobStatusListener listener : this.jobStatusListeners) {
                try {
                    listener.jobStatusChanges(this.getJobID(), newState, timestamp, serializedError);
                }
                catch (Throwable t) {
                    LOG.warn("Error while notifying JobStatusListener", t);
                }
            }
        }
    }

    void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState newExecutionState, Throwable error) {
        ExecutionJobVertex vertex = this.getJobVertex(vertexId);
        if (this.executionListeners.size() > 0) {
            String message = error == null ? null : ExceptionUtils.stringifyException((Throwable)error);
            long timestamp = System.currentTimeMillis();
            for (ExecutionStatusListener listener : this.executionListeners) {
                try {
                    listener.executionStatusChanged(this.getJobID(), vertexId, vertex.getJobVertex().getName(), vertex.getParallelism(), subtask, executionID, newExecutionState, timestamp, message);
                }
                catch (Throwable t) {
                    LOG.warn("Error while notifying ExecutionStatusListener", t);
                }
            }
        }
        if (newExecutionState == ExecutionState.FAILED) {
            this.fail(error);
        }
    }

    public ArchivedExecutionGraph archive() {
        Map<String, Object> serializedUserAccumulators;
        HashMap<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<JobVertexID, ArchivedExecutionJobVertex>();
        ArrayList<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<ArchivedExecutionJobVertex>();
        for (ExecutionJobVertex task : this.verticesInCreationOrder) {
            ArchivedExecutionJobVertex archivedTask = task.archive();
            archivedVerticesInCreationOrder.add(archivedTask);
            archivedTasks.put(task.getJobVertexId(), archivedTask);
        }
        try {
            serializedUserAccumulators = this.getAccumulatorsSerialized();
        }
        catch (Exception e) {
            LOG.warn("Error occurred while archiving user accumulators.", (Throwable)e);
            serializedUserAccumulators = Collections.emptyMap();
        }
        return new ArchivedExecutionGraph(this.getJobID(), this.getJobName(), archivedTasks, archivedVerticesInCreationOrder, this.stateTimestamps, this.getState(), this.getFailureCauseAsString(), this.getJsonPlan(), this.getAccumulatorResultsStringified(), serializedUserAccumulators, this.getArchivedExecutionConfig(), this.isStoppable(), this.getJobSnapshottingSettings(), this.getCheckpointStatsSnapshot());
    }

    private class RestartTimeGauge
    implements Gauge<Long> {
        private RestartTimeGauge() {
        }

        public Long getValue() {
            long restartingTimestamp = ExecutionGraph.this.stateTimestamps[JobStatus.RESTARTING.ordinal()];
            if (restartingTimestamp <= 0L) {
                return 0L;
            }
            if (ExecutionGraph.this.stateTimestamps[JobStatus.RUNNING.ordinal()] >= restartingTimestamp) {
                return ExecutionGraph.this.stateTimestamps[JobStatus.RUNNING.ordinal()] - restartingTimestamp;
            }
            if (ExecutionGraph.this.state.isTerminalState()) {
                return ExecutionGraph.this.stateTimestamps[ExecutionGraph.this.state.ordinal()] - restartingTimestamp;
            }
            return System.currentTimeMillis() - restartingTimestamp;
        }
    }
}

