/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Task
implements Runnable,
TaskActions {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final TaskInfo taskInfo;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final Collection<BlobKey> requiredJarFiles;
    private final Collection<URL> requiredClasspaths;
    private final String nameOfInvokableClass;
    private final TaskManagerRuntimeInfo taskManagerConfig;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private final ResultPartition[] producedPartitions;
    private final ResultPartitionWriter[] writers;
    private final SingleInputGate[] inputGates;
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
    private final TaskManagerConnection taskManagerConnection;
    private final InputSplitProvider inputSplitProvider;
    private final CheckpointResponder checkpointResponder;
    private final List<TaskExecutionStateListener> taskExecutionStateListeners;
    private final LibraryCacheManager libraryCache;
    private final FileCache fileCache;
    private final NetworkEnvironment network;
    private final AccumulatorRegistry accumulatorRegistry;
    private final Thread executingThread;
    private final TaskMetricGroup metrics;
    private final PartitionProducerStateChecker partitionProducerStateChecker;
    private final Executor executor;
    private final AtomicBoolean invokableHasBeenCanceled;
    private volatile AbstractInvokable invokable;
    private volatile ExecutionState executionState = ExecutionState.CREATED;
    private volatile Throwable failureCause;
    private volatile ExecutorService asyncCallDispatcher;
    private volatile TaskStateHandles taskStateHandles;
    private long taskCancellationInterval;
    private long taskCancellationTimeout;

    public Task(JobInformation jobInformation, TaskInformation taskInformation, ExecutionAttemptID executionAttemptID, int subtaskIndex, int attemptNumber, Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors, Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors, int targetSlotNumber, TaskStateHandles taskStateHandles, MemoryManager memManager, IOManager ioManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager bcVarManager, TaskManagerConnection taskManagerConnection, InputSplitProvider inputSplitProvider, CheckpointResponder checkpointResponder, LibraryCacheManager libraryCache, FileCache fileCache, TaskManagerRuntimeInfo taskManagerConfig, TaskMetricGroup metricGroup, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) {
        Preconditions.checkNotNull((Object)jobInformation);
        Preconditions.checkNotNull((Object)taskInformation);
        Preconditions.checkArgument((0 <= subtaskIndex ? 1 : 0) != 0, (Object)"The subtask index must be positive.");
        Preconditions.checkArgument((0 <= attemptNumber ? 1 : 0) != 0, (Object)"The attempt number must be positive.");
        Preconditions.checkArgument((0 <= targetSlotNumber ? 1 : 0) != 0, (Object)"The target slot number must be positive.");
        this.taskInfo = new TaskInfo(taskInformation.getTaskName(), taskInformation.getNumberOfKeyGroups(), subtaskIndex, taskInformation.getNumberOfSubtasks(), attemptNumber);
        this.jobId = jobInformation.getJobId();
        this.vertexId = taskInformation.getJobVertexId();
        this.executionId = (ExecutionAttemptID)((Object)Preconditions.checkNotNull((Object)((Object)executionAttemptID)));
        this.taskNameWithSubtask = this.taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = jobInformation.getJobConfiguration();
        this.taskConfiguration = taskInformation.getTaskConfiguration();
        this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
        this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
        this.nameOfInvokableClass = taskInformation.getInvokableClassName();
        this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
        this.taskStateHandles = taskStateHandles;
        Configuration tmConfig = taskManagerConfig.getConfiguration();
        this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
        this.taskCancellationTimeout = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.broadcastVariableManager = (BroadcastVariableManager)Preconditions.checkNotNull((Object)bcVarManager);
        this.accumulatorRegistry = new AccumulatorRegistry(this.jobId, this.executionId);
        this.inputSplitProvider = (InputSplitProvider)Preconditions.checkNotNull((Object)inputSplitProvider);
        this.checkpointResponder = (CheckpointResponder)Preconditions.checkNotNull((Object)checkpointResponder);
        this.taskManagerConnection = (TaskManagerConnection)Preconditions.checkNotNull((Object)taskManagerConnection);
        this.libraryCache = (LibraryCacheManager)Preconditions.checkNotNull((Object)libraryCache);
        this.fileCache = (FileCache)Preconditions.checkNotNull((Object)fileCache);
        this.network = (NetworkEnvironment)Preconditions.checkNotNull((Object)networkEnvironment);
        this.taskManagerConfig = (TaskManagerRuntimeInfo)Preconditions.checkNotNull((Object)taskManagerConfig);
        this.taskExecutionStateListeners = new CopyOnWriteArrayList<TaskExecutionStateListener>();
        this.metrics = metricGroup;
        this.partitionProducerStateChecker = (PartitionProducerStateChecker)Preconditions.checkNotNull((Object)partitionProducerStateChecker);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        String taskNameWithSubtaskAndId = this.taskNameWithSubtask + " (" + (Object)((Object)this.executionId) + ')';
        this.producedPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
        this.writers = new ResultPartitionWriter[resultPartitionDeploymentDescriptors.size()];
        int counter = 0;
        for (ResultPartitionDeploymentDescriptor desc : resultPartitionDeploymentDescriptors) {
            ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), this.executionId);
            this.producedPartitions[counter] = new ResultPartition(taskNameWithSubtaskAndId, this, this.jobId, partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, ioManager, desc.sendScheduleOrUpdateConsumersMessage());
            this.writers[counter] = new ResultPartitionWriter(this.producedPartitions[counter]);
            ++counter;
        }
        this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
        this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
        counter = 0;
        for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor : inputGateDeploymentDescriptors) {
            SingleInputGate gate;
            this.inputGates[counter] = gate = SingleInputGate.create(taskNameWithSubtaskAndId, this.jobId, this.executionId, inputGateDeploymentDescriptor, networkEnvironment, this, metricGroup.getIOMetricGroup());
            this.inputGatesById.put(gate.getConsumedResultId(), gate);
            ++counter;
        }
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public ResultPartitionWriter[] getAllWriters() {
        return this.writers;
    }

    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID id) {
        return this.inputGatesById.get((Object)id);
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public TaskMetricGroup getMetricGroup() {
        return this.metrics;
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    @VisibleForTesting
    long getTaskCancellationInterval() {
        return this.taskCancellationInterval;
    }

    @VisibleForTesting
    long getTaskCancellationTimeout() {
        return this.taskCancellationTimeout;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        while (true) {
            block46: {
                if ((current = this.executionState) != ExecutionState.CREATED) break block46;
                if (!this.transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) continue;
                distributedCacheEntries = new HashMap<String, Future<Path>>();
                invokable = null;
                ** try [egrp 0[TRYBLOCK] [3, 7 : 153->791)] { 
lbl7:
                // 1 sources

                ** GOTO lbl-1000
            }
            if (current == ExecutionState.FAILED) {
                this.notifyFinalState();
                if (this.metrics == null) return;
                this.metrics.close();
                return;
            }
            if (current != ExecutionState.CANCELING) {
                if (this.metrics == null) throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
                this.metrics.close();
                throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
            }
            if (this.transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) break;
        }
        this.notifyFinalState();
        if (this.metrics == null) return;
        this.metrics.close();
        return;
lbl-1000:
        // 1 sources

        {
            Task.LOG.info("Creating FileSystem stream leak safety net for task {}", (Object)this);
            FileSystem.createAndSetFileSystemCloseableRegistryForThread();
            Task.LOG.info("Loading JAR files for task {}.", (Object)this);
            userCodeClassLoader = this.createUserCodeClassloader(this.libraryCache);
            executionConfig = (ExecutionConfig)this.serializedExecutionConfig.deserializeValue(userCodeClassLoader);
            if (executionConfig.getTaskCancellationInterval() >= 0L) {
                this.taskCancellationInterval = executionConfig.getTaskCancellationInterval();
            }
            if (executionConfig.getTaskCancellationTimeout() >= 0L) {
                this.taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
            }
            invokable = this.loadAndInstantiateInvokable(userCodeClassLoader, this.nameOfInvokableClass);
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            Task.LOG.info("Registering task at network: {}.", (Object)this);
            this.network.registerTask(this);
            if (this.metrics != null && this.metrics.getIOMetricGroup() != null) {
                this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
            }
            try {
                for (Map.Entry entry : DistributedCache.readFileInfoFromConfig((Configuration)this.jobConfiguration)) {
                    Task.LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
                    cp = this.fileCache.createTmpFile((String)entry.getKey(), (DistributedCache.DistributedCacheEntry)entry.getValue(), this.jobId);
                    distributedCacheEntries.put((String)entry.getKey(), cp);
                }
            }
            catch (Exception e) {
                throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}), e);
            }
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            kvStateRegistry = this.network.createKvStateTaskRegistry(this.jobId, this.getJobVertexId());
            env = new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, executionConfig, this.taskInfo, this.jobConfiguration, this.taskConfiguration, userCodeClassLoader, this.memoryManager, this.ioManager, this.broadcastVariableManager, this.accumulatorRegistry, kvStateRegistry, this.inputSplitProvider, distributedCacheEntries, this.writers, this.inputGates, this.checkpointResponder, this.taskManagerConfig, this.metrics, this);
            invokable.setEnvironment(env);
            if (null != this.taskStateHandles) {
                if (invokable instanceof StatefulTask == false) throw new IllegalStateException("Found operator state for a non-stateful task invokable");
                op = (StatefulTask)invokable;
                op.setInitialState(this.taskStateHandles);
                this.taskStateHandles = null;
            }
            this.invokable = invokable;
            if (!this.transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
            this.notifyObservers(ExecutionState.RUNNING, null);
            this.taskManagerConnection.updateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, ExecutionState.RUNNING));
            this.executingThread.setContextClassLoader(userCodeClassLoader);
            invokable.invoke();
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            for (ResultPartition partition : this.producedPartitions) {
                if (partition == null) continue;
                partition.finish();
            }
            if (this.transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED) == false) throw new CancelTaskException();
            this.notifyObservers(ExecutionState.FINISHED, null);
        }
        try {
            Task.LOG.info("Freeing task resources for {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
            dispatcher = this.asyncCallDispatcher;
            if (dispatcher != null && !dispatcher.isShutdown()) {
                dispatcher.shutdownNow();
            }
            this.network.unregisterTask(this);
            if (invokable != null) {
                this.memoryManager.releaseAll(invokable);
            }
            this.libraryCache.unregisterTask(this.jobId, this.executionId);
            this.removeCachedFiles(distributedCacheEntries, this.fileCache);
            Task.LOG.info("Ensuring all FileSystem streams are closed for task {}", (Object)this);
            FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
            this.notifyFinalState();
        }
        catch (Throwable t) {
            message = String.format("FATAL - exception in resource cleanup of task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId});
            Task.LOG.error(message, t);
            this.notifyFatalError(message, t);
        }
        try {
            this.metrics.close();
            return;
        }
        catch (Throwable t) {
            Task.LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, t});
            return;
        }
lbl100:
        // 1 sources

        catch (Throwable t) {
            block45: {
                try {
                    try {}
                    catch (Throwable tt) {
                        message = String.format("FATAL - exception in exception handler of task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId});
                        Task.LOG.error(message, tt);
                        this.notifyFatalError(message, tt);
                        break block45;
                    }
                }
lbl109:
                // 2 sources

                catch (Throwable var11_22) {
                    try {
                        Task.LOG.info("Freeing task resources for {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
                        dispatcher = this.asyncCallDispatcher;
                        if (dispatcher != null && !dispatcher.isShutdown()) {
                            dispatcher.shutdownNow();
                        }
                        this.network.unregisterTask(this);
                        if (invokable != null) {
                            this.memoryManager.releaseAll(invokable);
                        }
                        this.libraryCache.unregisterTask(this.jobId, this.executionId);
                        this.removeCachedFiles(distributedCacheEntries, this.fileCache);
                        Task.LOG.info("Ensuring all FileSystem streams are closed for task {}", (Object)this);
                        FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
                        this.notifyFinalState();
                    }
                    catch (Throwable t) {
                        message = String.format("FATAL - exception in resource cleanup of task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId});
                        Task.LOG.error(message, t);
                        this.notifyFatalError(message, t);
                    }
                    try {
                        this.metrics.close();
                        throw var11_22;
                    }
                    catch (Throwable t) {
                        Task.LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, t});
                    }
                    throw var11_22;
                }
                while (true) {
                    if ((current = this.executionState) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                        if (t instanceof CancelTaskException) {
                            if (!this.transitionState(current, ExecutionState.CANCELED)) continue;
                            this.cancelInvokable();
                            this.notifyObservers(ExecutionState.CANCELED, null);
                            break block45;
                        } else {
                            if (!this.transitionState(current, ExecutionState.FAILED, t)) continue;
                            errorMessage = String.format("Execution of %s (%s) failed.", new Object[]{this.taskNameWithSubtask, this.executionId});
                            this.failureCause = t;
                            this.cancelInvokable();
                            this.notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                        }
                        break block45;
                    }
                    if (current == ExecutionState.CANCELING) {
                        if (!this.transitionState(current, ExecutionState.CANCELED)) continue;
                        this.notifyObservers(ExecutionState.CANCELED, null);
                        break block45;
                    }
                    if (current == ExecutionState.FAILED) break block45;
                    if (this.transitionState(current, ExecutionState.FAILED, t)) break;
                }
                Task.LOG.error("Unexpected state in task {} ({}) during an exception: {}.", new Object[]{this.taskNameWithSubtask, this.executionId, current});
            }
            try {
                Task.LOG.info("Freeing task resources for {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
                dispatcher = this.asyncCallDispatcher;
                if (dispatcher != null && !dispatcher.isShutdown()) {
                    dispatcher.shutdownNow();
                }
                this.network.unregisterTask(this);
                if (invokable != null) {
                    this.memoryManager.releaseAll(invokable);
                }
                this.libraryCache.unregisterTask(this.jobId, this.executionId);
                this.removeCachedFiles(distributedCacheEntries, this.fileCache);
                Task.LOG.info("Ensuring all FileSystem streams are closed for task {}", (Object)this);
                FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
                this.notifyFinalState();
            }
            catch (Throwable t) {
                message = String.format("FATAL - exception in resource cleanup of task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId});
                Task.LOG.error(message, t);
                this.notifyFatalError(message, t);
            }
            try {
                this.metrics.close();
                return;
            }
            catch (Throwable t) {
                Task.LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, t});
                return;
            }
        }
    }

    private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCache) throws Exception {
        long startDownloadTime = System.currentTimeMillis();
        libraryCache.registerTask(this.jobId, this.executionId, this.requiredJarFiles, this.requiredClasspaths);
        LOG.debug("Register task {} at library cache manager took {} milliseconds", (Object)this.executionId, (Object)(System.currentTimeMillis() - startDownloadTime));
        ClassLoader userCodeClassLoader = libraryCache.getClassLoader(this.jobId);
        if (userCodeClassLoader == null) {
            throw new Exception("No user code classloader available.");
        }
        return userCodeClassLoader;
    }

    private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
        Class<AbstractInvokable> invokableClass;
        try {
            invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
        }
        catch (Throwable t) {
            throw new Exception("Could not load the task's invokable class.", t);
        }
        try {
            return invokableClass.newInstance();
        }
        catch (Throwable t) {
            throw new Exception("Could not instantiate the task's invokable class.", t);
        }
    }

    private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
        try {
            for (Map.Entry<String, Future<Path>> entry : entries.entrySet()) {
                String name = entry.getKey();
                try {
                    fileCache.deleteTmpFile(name, this.jobId);
                }
                catch (Exception e) {
                    LOG.error("Distributed Cache could not remove cached file registered under '" + name + "'.", (Throwable)e);
                }
            }
        }
        catch (Throwable t) {
            LOG.error("Error while removing cached local files from distributed cache.");
        }
    }

    private void notifyFinalState() {
        this.taskManagerConnection.notifyFinalState(this.executionId);
    }

    private void notifyFatalError(String message, Throwable cause) {
        this.taskManagerConnection.notifyFatalError(message, cause);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState newState) {
        return this.transitionState(currentState, newState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) {
        if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
            if (cause == null) {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, currentState, newState});
            } else {
                LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, currentState, newState, cause});
            }
            return true;
        }
        return false;
    }

    public void stopExecution() throws UnsupportedOperationException {
        LOG.info("Attempting to stop task {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
        if (!(this.invokable instanceof StoppableTask)) {
            throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}));
        }
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    ((StoppableTask)((Object)Task.this.invokable)).stop();
                }
                catch (RuntimeException e) {
                    LOG.error("Stopping task {} ({}) failed.", new Object[]{Task.this.taskNameWithSubtask, Task.this.executionId, e});
                    Task.this.taskManagerConnection.failTask(Task.this.executionId, e);
                }
            }
        };
        this.executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}));
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
        this.cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    @Override
    public void failExternally(Throwable cause) {
        LOG.info("Attempting to fail task externally {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
        this.cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
    }

    private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
        ExecutionState current;
        block4: {
            while (true) {
                if ((current = this.executionState).isTerminal() || current == ExecutionState.CANCELING) {
                    LOG.info("Task {} is already in state {}", (Object)this.taskNameWithSubtask, (Object)current);
                    return;
                }
                if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
                    if (!this.transitionState(current, targetState, cause)) continue;
                    this.failureCause = cause;
                    this.notifyObservers(targetState, new Exception(String.format("Cancel or fail execution of %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}), cause));
                    return;
                }
                if (current != ExecutionState.RUNNING) break block4;
                if (this.transitionState(ExecutionState.RUNNING, targetState, cause)) break;
            }
            if (this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
                this.failureCause = cause;
                this.notifyObservers(targetState, new Exception(String.format("Cancel or fail execution of %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}), cause));
                LOG.info("Triggering cancellation of task code {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
                TaskCanceler canceler = new TaskCanceler(LOG, this.invokable, this.executingThread, this.taskNameWithSubtask, this.taskCancellationInterval, this.taskCancellationTimeout, this.taskManagerConnection, this.producedPartitions, this.inputGates);
                Thread cancelThread = new Thread(this.executingThread.getThreadGroup(), canceler, String.format("Canceler for %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}));
                cancelThread.setDaemon(true);
                cancelThread.start();
            }
            return;
        }
        throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).", new Object[]{current, this.taskNameWithSubtask, this.executionId}));
    }

    public void registerExecutionListener(TaskExecutionStateListener listener) {
        this.taskExecutionStateListeners.add(listener);
    }

    private void notifyObservers(ExecutionState newState, Throwable error) {
        TaskExecutionState stateUpdate = new TaskExecutionState(this.jobId, this.executionId, newState, error);
        for (TaskExecutionStateListener listener : this.taskExecutionStateListeners) {
            listener.notifyTaskExecutionStateChanged(stateUpdate);
        }
    }

    @Override
    public void triggerPartitionProducerStateCheck(JobID jobId, final IntermediateDataSetID intermediateDataSetId, final ResultPartitionID resultPartitionId) {
        org.apache.flink.runtime.concurrent.Future<ExecutionState> futurePartitionState = this.partitionProducerStateChecker.requestPartitionProducerState(jobId, intermediateDataSetId, resultPartitionId);
        futurePartitionState.handleAsync(new BiFunction<ExecutionState, Throwable, Void>(){

            @Override
            public Void apply(ExecutionState executionState, Throwable throwable) {
                try {
                    if (executionState != null) {
                        Task.this.onPartitionStateUpdate(intermediateDataSetId, resultPartitionId, executionState);
                    } else if (throwable instanceof TimeoutException) {
                        Task.this.onPartitionStateUpdate(intermediateDataSetId, resultPartitionId, ExecutionState.RUNNING);
                    } else if (throwable instanceof PartitionProducerDisposedException) {
                        String msg = String.format("Producer %s of partition %s disposed. Cancelling execution.", new Object[]{resultPartitionId.getProducerId(), resultPartitionId.getPartitionId()});
                        LOG.info(msg, throwable);
                        Task.this.cancelExecution();
                    } else {
                        Task.this.failExternally(throwable);
                    }
                }
                catch (IOException | InterruptedException e) {
                    Task.this.failExternally(e);
                }
                return null;
            }
        }, this.executor);
    }

    public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) {
        AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof StatefulTask) {
                final StatefulTask statefulTask = (StatefulTask)((Object)invokable);
                final String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        LOG.debug("Creating FileSystem stream leak safety net for {}", (Object)Thread.currentThread().getName());
                        FileSystem.createAndSetFileSystemCloseableRegistryForThread();
                        try {
                            boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
                            if (!success) {
                                Task.this.checkpointResponder.declineCheckpoint(Task.this.getJobID(), Task.this.getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName));
                            }
                        }
                        catch (Throwable t) {
                            if (Task.this.getExecutionState() == ExecutionState.RUNNING) {
                                Task.this.failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " + Task.this.taskNameWithSubtask, t));
                            } else {
                                LOG.debug("Encountered error while triggering checkpoint {} for {} ({}) while being not in state running.", new Object[]{checkpointID, Task.this.taskNameWithSubtask, Task.this.executionId, t});
                            }
                        }
                        finally {
                            LOG.debug("Ensuring all FileSystem streams are closed for {}", (Object)Thread.currentThread().getName());
                            FileSystem.closeAndDisposeFileSystemCloseableRegistryForThread();
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", new Object[]{this.taskNameWithSubtask, this.executionId}));
            } else {
                this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, checkpointID, new CheckpointDeclineTaskNotCheckpointingException(this.taskNameWithSubtask));
                LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
            }
        } else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
            this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, checkpointID, new CheckpointDeclineTaskNotReadyException(this.taskNameWithSubtask));
        }
    }

    public void notifyCheckpointComplete(final long checkpointID) {
        AbstractInvokable invokable = this.invokable;
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof StatefulTask) {
                final StatefulTask statefulTask = (StatefulTask)((Object)invokable);
                String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    @Override
                    public void run() {
                        block2: {
                            try {
                                statefulTask.notifyCheckpointComplete(checkpointID);
                            }
                            catch (Throwable t) {
                                if (Task.this.getExecutionState() != ExecutionState.RUNNING) break block2;
                                Task.this.failExternally(new RuntimeException("Error while confirming checkpoint", t));
                            }
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
            } else {
                LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.", (Object)this.taskNameWithSubtask);
            }
        } else {
            LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", (Object)this.taskNameWithSubtask);
        }
    }

    @VisibleForTesting
    void onPartitionStateUpdate(IntermediateDataSetID intermediateDataSetId, ResultPartitionID resultPartitionId, ExecutionState producerState) throws IOException, InterruptedException {
        if (this.executionState == ExecutionState.RUNNING) {
            SingleInputGate inputGate = this.inputGatesById.get((Object)intermediateDataSetId);
            if (inputGate != null) {
                if (producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING || producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED) {
                    inputGate.retriggerPartitionRequest(resultPartitionId.getPartitionId());
                } else if (producerState == ExecutionState.CANCELING || producerState == ExecutionState.CANCELED || producerState == ExecutionState.FAILED) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", new Object[]{this.taskNameWithSubtask, resultPartitionId.getPartitionId(), resultPartitionId.getProducerId(), producerState});
                    }
                    this.cancelExecution();
                } else {
                    String msg = String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", new Object[]{resultPartitionId.getProducerId(), resultPartitionId.getPartitionId(), producerState});
                    this.failExternally(new IllegalStateException(msg));
                }
            } else {
                this.failExternally(new IllegalStateException("Received partition producer state for unknown input gate " + (Object)((Object)intermediateDataSetId) + "."));
            }
        } else {
            LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", (Object)this.taskNameWithSubtask);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeAsyncCallRunnable(Runnable runnable, String callName) {
        Task task = this;
        synchronized (task) {
            block8: {
                if (this.executionState != ExecutionState.RUNNING) {
                    return;
                }
                ExecutorService executor = this.asyncCallDispatcher;
                if (executor == null) {
                    this.asyncCallDispatcher = executor = Executors.newSingleThreadExecutor(new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + this.taskNameWithSubtask));
                    if (this.executionState != ExecutionState.RUNNING) {
                        executor.shutdown();
                        this.asyncCallDispatcher = null;
                        return;
                    }
                }
                LOG.debug("Invoking async call {} on task {}", (Object)callName, (Object)this.taskNameWithSubtask);
                try {
                    executor.submit(runnable);
                }
                catch (RejectedExecutionException e) {
                    if (this.executionState != ExecutionState.RUNNING) break block8;
                    throw new RuntimeException("Async call was rejected, even though the task is running.", e);
                }
            }
        }
    }

    private void cancelInvokable() {
        if (this.invokable != null && this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            try {
                this.invokable.cancel();
            }
            catch (Throwable t) {
                LOG.error("Error while canceling task {}.", (Object)this.taskNameWithSubtask, (Object)t);
            }
        }
    }

    public String toString() {
        return String.format("%s (%s) [%s]", new Object[]{this.taskNameWithSubtask, this.executionId, this.executionState});
    }

    private static class TaskCanceler
    implements Runnable {
        private final Logger logger;
        private final AbstractInvokable invokable;
        private final Thread executer;
        private final String taskName;
        private final ResultPartition[] producedPartitions;
        private final SingleInputGate[] inputGates;
        private final long interruptInterval;
        private final long interruptTimeout;
        private final TaskManagerConnection taskManager;
        @Nullable
        private final Thread watchDogThread;

        public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName, long cancellationInterval, long cancellationTimeout, TaskManagerConnection taskManager, ResultPartition[] producedPartitions, SingleInputGate[] inputGates) {
            this.logger = logger;
            this.invokable = invokable;
            this.executer = executer;
            this.taskName = taskName;
            this.interruptInterval = cancellationInterval;
            this.interruptTimeout = cancellationTimeout;
            this.taskManager = taskManager;
            this.producedPartitions = producedPartitions;
            this.inputGates = inputGates;
            if (cancellationTimeout > 0L) {
                this.watchDogThread = new Thread(executer.getThreadGroup(), new TaskCancelerWatchDog(), "WatchDog for " + taskName + " cancellation");
                this.watchDogThread.setDaemon(true);
            } else {
                this.watchDogThread = null;
            }
        }

        @Override
        public void run() {
            try {
                if (this.watchDogThread != null) {
                    this.watchDogThread.start();
                }
                try {
                    this.invokable.cancel();
                }
                catch (Throwable t) {
                    this.logger.error("Error while canceling the task {}.", (Object)this.taskName, (Object)t);
                }
                for (ResultPartition partition : this.producedPartitions) {
                    try {
                        partition.destroyBufferPool();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to release result partition buffer pool for task {}.", (Object)this.taskName, (Object)t);
                    }
                }
                for (SingleInputGate inputGate : this.inputGates) {
                    try {
                        inputGate.releaseAllResources();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to release input gate for task {}.", (Object)this.taskName, (Object)t);
                    }
                }
                this.executer.interrupt();
                try {
                    this.executer.join(this.interruptInterval);
                }
                catch (InterruptedException arr$) {
                    // empty catch block
                }
                if (this.watchDogThread != null) {
                    this.watchDogThread.interrupt();
                    this.watchDogThread.join();
                }
            }
            catch (Throwable t) {
                this.logger.error("Error in the task canceler for task {}.", (Object)this.taskName, (Object)t);
            }
        }

        private class TaskCancelerWatchDog
        implements Runnable {
            private TaskCancelerWatchDog() {
            }

            @Override
            public void run() {
                long intervalNanos = TimeUnit.NANOSECONDS.convert(TaskCanceler.this.interruptInterval, TimeUnit.MILLISECONDS);
                long timeoutNanos = TimeUnit.NANOSECONDS.convert(TaskCanceler.this.interruptTimeout, TimeUnit.MILLISECONDS);
                long deadline = System.nanoTime() + timeoutNanos;
                try {
                    Thread.sleep(TaskCanceler.this.interruptInterval);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                while (TaskCanceler.this.executer.isAlive()) {
                    StackTraceElement[] stack;
                    long now = System.nanoTime();
                    StringBuilder bld = new StringBuilder();
                    for (StackTraceElement e : stack = TaskCanceler.this.executer.getStackTrace()) {
                        bld.append(e).append('\n');
                    }
                    if (now >= deadline) {
                        long duration = TimeUnit.SECONDS.convert(TaskCanceler.this.interruptInterval, TimeUnit.MILLISECONDS);
                        String msg = String.format("Task '%s' did not react to cancelling signal in the last %d seconds, but is stuck in method:\n %s", TaskCanceler.this.taskName, duration, bld.toString());
                        TaskCanceler.this.logger.info("Notifying TaskManager about fatal error. {}.", (Object)msg);
                        TaskCanceler.this.taskManager.notifyFatalError(msg, null);
                        return;
                    }
                    TaskCanceler.this.logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", (Object)TaskCanceler.this.taskName, (Object)bld.toString());
                    TaskCanceler.this.executer.interrupt();
                    try {
                        long timeLeftNanos = Math.min(intervalNanos, deadline - now);
                        long timeLeftMillis = TimeUnit.MILLISECONDS.convert(timeLeftNanos, TimeUnit.NANOSECONDS);
                        if (timeLeftMillis <= 0L) continue;
                        TaskCanceler.this.executer.join(timeLeftMillis);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
        }
    }
}

