/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.ActorGatewayCheckpointResponder;
import org.apache.flink.runtime.taskmanager.ActorGatewayTaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.ActorGatewayTaskManagerActions;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.ForwardingActorGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskInputSplitProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.WrappingRuntimeException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.duration.FiniteDuration;

public class TaskTest
extends TestLogger {
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private static OneShotLatch cancelLatch;
    private ActorGateway taskManagerGateway;
    private ActorGateway jobManagerGateway;
    private ActorGateway listenerGateway;
    private ActorGatewayTaskExecutionStateListener listener;
    private ActorGatewayTaskManagerActions taskManagerConnection;
    private BlockingQueue<Object> taskManagerMessages;
    private BlockingQueue<Object> jobManagerMessages;
    private BlockingQueue<Object> listenerMessages;

    @Before
    public void createQueuesAndActors() {
        this.taskManagerMessages = new LinkedBlockingQueue<Object>();
        this.jobManagerMessages = new LinkedBlockingQueue<Object>();
        this.listenerMessages = new LinkedBlockingQueue<Object>();
        this.taskManagerGateway = new ForwardingActorGateway(this.taskManagerMessages);
        this.jobManagerGateway = new ForwardingActorGateway(this.jobManagerMessages);
        this.listenerGateway = new ForwardingActorGateway(this.listenerMessages);
        this.listener = new ActorGatewayTaskExecutionStateListener(this.listenerGateway);
        this.taskManagerConnection = new ActorGatewayTaskManagerActions(this.taskManagerGateway);
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
        cancelLatch = new OneShotLatch();
    }

    @After
    public void clearActorsAndMessages() {
        this.jobManagerMessages = null;
        this.taskManagerMessages = null;
        this.listenerMessages = null;
        this.taskManagerGateway = null;
        this.jobManagerGateway = null;
        this.listenerGateway = null;
    }

    @Test
    public void testRegularExecution() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FINISHED, task, false);
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelRightAway() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
            task.run();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyRightAway() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            task.failExternally((Throwable)new Exception("fail externally"));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testLibraryCacheRegistrationFailed() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class, (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class));
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNotNull((Object)task.getFailureCause());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("classloader"));
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInNetworkRegistration() {
        try {
            LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
            Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
            ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
            ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
            PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
            Executor executor = (Executor)Mockito.mock(Executor.class);
            NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
            Mockito.when((Object)network.getResultPartitionManager()).thenReturn((Object)partitionManager);
            Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
            ((NetworkEnvironment)Mockito.doThrow((Throwable)new RuntimeException("buffers")).when((Object)network)).registerTask((Task)Matchers.any(Task.class));
            Task task = this.createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionProducerStateChecker, executor);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("buffers"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testInvokableInstantiationFailed() {
        try {
            Task task = this.createTask(InvokableNonInstantiable.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("instantiate"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInInvoke() {
        try {
            Task task = this.createTask(InvokableWithExceptionInInvoke.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailWithWrappedException() {
        try {
            Task task = this.createTask(FailingInvokableWithChainedException.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Throwable cause = task.getFailureCause();
            Assert.assertTrue((boolean)(cause instanceof IOException));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelDuringInvoke() {
        try {
            Task task = this.createTask(InvokableBlockingInInvoke.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateCancelingAndCanceledListenerMessage(task);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringInvoke() {
        try {
            Task task = this.createTask(InvokableBlockingInInvoke.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.startTaskThread();
            awaitLatch.await();
            task.failExternally((Throwable)new Exception("test"));
            Assert.assertTrue((task.getExecutionState() == ExecutionState.FAILED ? 1 : 0) != 0);
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() {
        try {
            Task task = this.createTask(InvokableWithExceptionInInvoke.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.run();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailesAfterCanceling() {
        try {
            Task task = this.createTask(InvokableWithExceptionOnTrigger.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateCancelingAndCanceledListenerMessage(task);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() {
        try {
            Task task = this.createTask(InvokableWithExceptionOnTrigger.class);
            task.registerExecutionListener((TaskExecutionStateListener)this.listener);
            task.startTaskThread();
            awaitLatch.await();
            task.failExternally((Throwable)new Exception("external"));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task task = this.createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        triggerLatch.trigger();
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task task = this.createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdate() throws Exception {
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        Task task = this.createTask(InvokableBlockingInInvoke.class);
        this.setInputGate(task, inputGate);
        HashMap<ExecutionState, ExecutionState> expected = new HashMap<ExecutionState, ExecutionState>(ExecutionState.values().length);
        for (ExecutionState state : ExecutionState.values()) {
            expected.put(state, ExecutionState.FAILED);
        }
        expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
        expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
        expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
        expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
        expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        for (ExecutionState state : ExecutionState.values()) {
            this.setState(task, ExecutionState.RUNNING);
            task.onPartitionStateUpdate(resultId, partitionId, state);
            ExecutionState newTaskState = task.getExecutionState();
            Assert.assertEquals(expected.get(state), (Object)newTaskState);
        }
        ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)4))).retriggerPartitionRequest((IntermediateResultPartitionID)Matchers.eq((Object)partitionId.getPartitionId()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerPartitionStateUpdate() throws Exception {
        FlinkCompletableFuture promise;
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        PartitionProducerStateChecker partitionChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn(Mockito.mock(ResultPartitionManager.class));
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        this.createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
        this.createQueuesAndActors();
        Task task = this.createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
        FlinkCompletableFuture promise2 = new FlinkCompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)Matchers.eq((Object)task.getJobID()), (IntermediateDataSetID)Matchers.eq((Object)resultId), (ResultPartitionID)Matchers.eq((Object)partitionId))).thenReturn((Object)promise2);
        task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
        promise2.completeExceptionally((Throwable)new PartitionProducerDisposedException(partitionId));
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        this.createQueuesAndActors();
        task = this.createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
        promise2 = new FlinkCompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)Matchers.eq((Object)task.getJobID()), (IntermediateDataSetID)Matchers.eq((Object)resultId), (ResultPartitionID)Matchers.eq((Object)partitionId))).thenReturn((Object)promise2);
        task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
        promise2.completeExceptionally((Throwable)new RuntimeException("Any other exception"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        this.createQueuesAndActors();
        task = this.createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        try {
            task.startTaskThread();
            awaitLatch.await();
            this.setInputGate(task, inputGate);
            promise = new FlinkCompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)Matchers.eq((Object)task.getJobID()), (IntermediateDataSetID)Matchers.eq((Object)resultId), (ResultPartitionID)Matchers.eq((Object)partitionId))).thenReturn((Object)promise);
            task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
            promise.completeExceptionally((Throwable)new TimeoutException());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)1))).retriggerPartitionRequest((IntermediateResultPartitionID)Matchers.eq((Object)partitionId.getPartitionId()));
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
        this.createQueuesAndActors();
        task = this.createTask(InvokableBlockingInInvoke.class, libCache, network, consumableNotifier, partitionChecker, Executors.directExecutor());
        inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        try {
            task.startTaskThread();
            awaitLatch.await();
            this.setInputGate(task, inputGate);
            promise = new FlinkCompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)Matchers.eq((Object)task.getJobID()), (IntermediateDataSetID)Matchers.eq((Object)resultId), (ResultPartitionID)Matchers.eq((Object)partitionId))).thenReturn((Object)promise);
            task.triggerPartitionProducerStateCheck(task.getJobID(), resultId, partitionId);
            promise.complete((Object)ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)1))).retriggerPartitionRequest((IntermediateResultPartitionID)Matchers.eq((Object)partitionId.getPartitionId()));
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testWatchDogInterruptsTask() throws Exception {
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60000L);
        Task task = this.createTask(InvokableBlockingInCancel.class, config);
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        for (Object e : this.taskManagerMessages) {
            Assert.assertFalse((String)"Unexpected FatalError message", (boolean)(e instanceof TaskManagerMessages.FatalError));
        }
    }

    @Test
    public void testInterruptableSharedLockInInvokeAndCancel() throws Exception {
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = this.createTask(InvokableInterruptableSharedLockInInvokeAndCancel.class, config);
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        for (Object e : this.taskManagerMessages) {
            Assert.assertFalse((String)"Unexpected FatalError message", (boolean)(e instanceof TaskManagerMessages.FatalError));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorAfterUninterruptibleInvoke() throws Exception {
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = this.createTask(InvokableUninterruptibleBlockingInvoke.class, config);
        try {
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            for (int i = 0; i < 10; ++i) {
                Object msg = this.taskManagerMessages.poll(1L, TimeUnit.SECONDS);
                if (!(msg instanceof TaskManagerMessages.FatalError)) continue;
                return;
            }
            Assert.fail((String)"Did not receive expected task manager message");
        }
        finally {
            cancelLatch.trigger();
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testTaskConfig() throws Exception {
        long interval = 28218123L;
        long timeout = interval + 19292L;
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setTaskCancellationInterval(interval + 1337L);
        executionConfig.setTaskCancellationTimeout(timeout - 1337L);
        Task task = this.createTask(InvokableBlockingInInvoke.class, config, executionConfig);
        Assert.assertEquals((long)interval, (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)timeout, (long)task.getTaskCancellationTimeout());
        task.startTaskThread();
        awaitLatch.await();
        Assert.assertEquals((long)executionConfig.getTaskCancellationInterval(), (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)executionConfig.getTaskCancellationTimeout(), (long)task.getTaskCancellationTimeout());
        task.getExecutingThread().interrupt();
        task.getExecutingThread().join();
    }

    private void setInputGate(Task task, SingleInputGate inputGate) {
        try {
            Field f = Task.class.getDeclaredField("inputGates");
            f.setAccessible(true);
            f.set(task, new SingleInputGate[]{inputGate});
            HashMap<IntermediateDataSetID, SingleInputGate> byId = new HashMap<IntermediateDataSetID, SingleInputGate>(1);
            byId.put(inputGate.getConsumedResultId(), inputGate);
            f = Task.class.getDeclaredField("inputGatesById");
            f.setAccessible(true);
            f.set(task, byId);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private void setState(Task task, ExecutionState state) {
        try {
            Field f = Task.class.getDeclaredField("executionState");
            f.setAccessible(true);
            f.set(task, state);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable) throws IOException {
        return this.createTask(invokable, new Configuration(), new ExecutionConfig());
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config) throws IOException {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        return this.createTask(invokable, libCache, config, new ExecutionConfig());
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, Configuration config, ExecutionConfig execConfig) throws IOException {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        return this.createTask(invokable, libCache, config, execConfig);
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache) throws IOException {
        return this.createTask(invokable, libCache, new Configuration(), new ExecutionConfig());
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache, Configuration config, ExecutionConfig execConfig) throws IOException {
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getResultPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        Mockito.when((Object)network.createKvStateTaskRegistry((JobID)Matchers.any(JobID.class), (JobVertexID)Matchers.any(JobVertexID.class))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        return this.createTask(invokable, libCache, network, consumableNotifier, partitionProducerStateChecker, executor, config, execConfig);
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) throws IOException {
        return this.createTask(invokable, libCache, networkEnvironment, consumableNotifier, partitionProducerStateChecker, executor, new Configuration(), new ExecutionConfig());
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache, NetworkEnvironment networkEnvironment, ResultPartitionConsumableNotifier consumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor, Configuration taskManagerConfig, ExecutionConfig execConfig) throws IOException {
        JobID jobId = new JobID();
        JobVertexID jobVertexId = new JobVertexID();
        ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
        TaskInputSplitProvider inputSplitProvider = new TaskInputSplitProvider(this.jobManagerGateway, jobId, jobVertexId, executionAttemptId, new FiniteDuration(60L, TimeUnit.SECONDS));
        ActorGatewayCheckpointResponder checkpointResponder = new ActorGatewayCheckpointResponder(this.jobManagerGateway);
        SerializedValue serializedExecutionConfig = new SerializedValue((Object)execConfig);
        JobInformation jobInformation = new JobInformation(jobId, "Test Job", serializedExecutionConfig, new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(jobVertexId, "Test Task", 1, 1, invokable.getName(), new Configuration());
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
        Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        return new Task(jobInformation, taskInformation, executionAttemptId, new AllocationID(), 0, 0, Collections.emptyList(), Collections.emptyList(), 0, null, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (TaskManagerActions)this.taskManagerConnection, (InputSplitProvider)inputSplitProvider, (CheckpointResponder)checkpointResponder, libCache, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(taskManagerConfig), taskMetricGroup, consumableNotifier, partitionProducerStateChecker, executor);
    }

    private void validateUnregisterTask(ExecutionAttemptID id) {
        try {
            Object rawMessage = this.taskManagerMessages.take();
            Assert.assertNotNull((String)"There is no additional TaskManager message", (Object)rawMessage);
            if (!(rawMessage instanceof TaskMessages.TaskInFinalState)) {
                Assert.fail((String)("TaskManager message is not 'UnregisterTask', but " + rawMessage.getClass()));
            }
            TaskMessages.TaskInFinalState message = (TaskMessages.TaskInFinalState)rawMessage;
            Assert.assertEquals((Object)id, (Object)message.executionID());
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateTaskManagerStateChange(ExecutionState state, Task task, boolean hasError) {
        try {
            Object rawMessage = this.taskManagerMessages.take();
            Assert.assertNotNull((String)"There is no additional TaskManager message", (Object)rawMessage);
            if (!(rawMessage instanceof TaskMessages.UpdateTaskExecutionState)) {
                Assert.fail((String)("TaskManager message is not 'UpdateTaskExecutionState', but " + rawMessage.getClass()));
            }
            TaskMessages.UpdateTaskExecutionState message = (TaskMessages.UpdateTaskExecutionState)rawMessage;
            TaskExecutionState taskState = message.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
            Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
            if (hasError) {
                Assert.assertNotNull((Object)taskState.getError(((Object)((Object)this)).getClass().getClassLoader()));
            } else {
                Assert.assertNull((Object)taskState.getError(((Object)((Object)this)).getClass().getClassLoader()));
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateListenerMessage(ExecutionState state, Task task, boolean hasError) {
        try {
            TaskMessages.UpdateTaskExecutionState message = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message);
            TaskExecutionState taskState = message.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
            Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
            if (hasError) {
                Assert.assertNotNull((Object)taskState.getError(((Object)((Object)this)).getClass().getClassLoader()));
            } else {
                Assert.assertNull((Object)taskState.getError(((Object)((Object)this)).getClass().getClassLoader()));
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateCancelingAndCanceledListenerMessage(Task task) {
        try {
            TaskMessages.UpdateTaskExecutionState message1 = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            TaskMessages.UpdateTaskExecutionState message2 = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message1);
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message2);
            TaskExecutionState taskState1 = message1.taskExecutionState();
            TaskExecutionState taskState2 = message2.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState1.getJobID());
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState2.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState1.getID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState2.getID());
            ExecutionState state1 = taskState1.getExecutionState();
            ExecutionState state2 = taskState2.getExecutionState();
            Assert.assertTrue((state1 == ExecutionState.CANCELING && state2 == ExecutionState.CANCELED || state2 == ExecutionState.CANCELING && state1 == ExecutionState.CANCELED ? 1 : 0) != 0);
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private static class TestWrappedException
    extends WrappingRuntimeException {
        private static final long serialVersionUID = 1L;

        public TestWrappedException(@Nonnull Throwable cause) {
            super(cause);
        }
    }

    public static final class FailingInvokableWithChainedException
    extends AbstractInvokable {
        public void invoke() throws Exception {
            throw new TestWrappedException(new IOException("test"));
        }

        public void cancel() {
        }
    }

    public static final class InvokableUninterruptibleBlockingInvoke
    extends AbstractInvokable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            while (!cancelLatch.isTriggered()) {
                try {
                    InvokableUninterruptibleBlockingInvoke invokableUninterruptibleBlockingInvoke = this;
                    synchronized (invokableUninterruptibleBlockingInvoke) {
                        awaitLatch.trigger();
                        ((Object)((Object)this)).wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        public void cancel() throws Exception {
        }
    }

    public static final class InvokableBlockingInCancel
    extends AbstractInvokable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            try {
                cancelLatch.await();
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).wait();
                }
            }
            catch (InterruptedException ignored) {
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() throws Exception {
            InvokableBlockingInCancel invokableBlockingInCancel = this;
            synchronized (invokableBlockingInCancel) {
                cancelLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }
    }

    public static final class InvokableInterruptableSharedLockInInvokeAndCancel
    extends AbstractInvokable {
        private final Object lock = new Object();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                awaitLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                cancelLatch.trigger();
            }
        }
    }

    public static final class InvokableWithCancelTaskExceptionInInvoke
    extends AbstractInvokable {
        public void invoke() throws Exception {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw new CancelTaskException();
        }
    }

    public static final class InvokableBlockingInInvoke
    extends AbstractInvokable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            InvokableBlockingInInvoke invokableBlockingInInvoke = this;
            synchronized (invokableBlockingInInvoke) {
                ((Object)((Object)this)).wait();
            }
        }
    }

    public static abstract class InvokableNonInstantiable
    extends AbstractInvokable {
    }

    public static final class InvokableWithExceptionOnTrigger
    extends AbstractInvokable {
        public void invoke() {
            awaitLatch.trigger();
            while (true) {
                try {
                    triggerLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            throw new RuntimeException("test");
        }
    }

    public static final class InvokableWithExceptionInInvoke
    extends AbstractInvokable {
        public void invoke() throws Exception {
            throw new Exception("test");
        }
    }

    public static final class TestInvokableCorrect
    extends AbstractInvokable {
        public void invoke() {
        }

        public void cancel() throws Exception {
            Assert.fail((String)"This should not be called");
        }
    }
}

