package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest.class */
public class TaskTest {

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TestInvokableCorrect.class */
    public static final class TestInvokableCorrect extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TestInvokableWithException.class */
    public static final class TestInvokableWithException extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            throw new Exception("test exception");
        }
    }

    @Test
    public void testTaskStates() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            RuntimeEnvironment runtimeEnvironment = (RuntimeEnvironment) Mockito.mock(RuntimeEnvironment.class);
            Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            task.setEnvironment(runtimeEnvironment);
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.cancelExecution();
            Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
            Assert.assertFalse(task.startExecution());
            Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
            Assert.assertFalse(task.markAsFinished());
            Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
            task.markFailed(new Exception("test"));
            Assert.assertTrue(ExecutionState.CANCELED == task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager, Mockito.times(1))).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.CANCELED, (Throwable) null);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTaskStartFinish() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            final Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread() { // from class: org.apache.flink.runtime.taskmanager.TaskTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertTrue(task.markAsFinished());
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            RuntimeEnvironment runtimeEnvironment = (RuntimeEnvironment) Mockito.mock(RuntimeEnvironment.class);
            Mockito.when(runtimeEnvironment.getExecutingThread()).thenReturn(thread);
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.setEnvironment(runtimeEnvironment);
            task.startExecution();
            thread.join();
            if (atomicReference.get() != null) {
                ExceptionUtils.rethrow((Throwable) atomicReference.get());
            }
            Assert.assertEquals(ExecutionState.FINISHED, task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager)).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.FINISHED, (Throwable) null);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTaskFailesInRunning() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            final Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread() { // from class: org.apache.flink.runtime.taskmanager.TaskTest.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        task.markFailed(new Exception("test exception message"));
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            RuntimeEnvironment runtimeEnvironment = (RuntimeEnvironment) Mockito.mock(RuntimeEnvironment.class);
            Mockito.when(runtimeEnvironment.getExecutingThread()).thenReturn(thread);
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.setEnvironment(runtimeEnvironment);
            task.startExecution();
            thread.join();
            if (atomicReference.get() != null) {
                ExceptionUtils.rethrow((Throwable) atomicReference.get());
            }
            Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager)).notifyExecutionStateChange((JobID) Matchers.eq(jobID), (ExecutionAttemptID) Matchers.eq(executionAttemptID), (ExecutionState) Matchers.eq(ExecutionState.FAILED), (Throwable) Matchers.any(Throwable.class));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTaskCanceledInRunning() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            final Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            final AtomicReference atomicReference = new AtomicReference();
            final OneShotLatch oneShotLatch = new OneShotLatch();
            final OneShotLatch oneShotLatch2 = new OneShotLatch();
            Thread thread = new Thread() { // from class: org.apache.flink.runtime.taskmanager.TaskTest.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        oneShotLatch.trigger();
                        oneShotLatch2.await();
                        Assert.assertFalse(task.markAsFinished());
                        task.cancelingDone();
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            RuntimeEnvironment runtimeEnvironment = (RuntimeEnvironment) Mockito.mock(RuntimeEnvironment.class);
            Mockito.when(runtimeEnvironment.getExecutingThread()).thenReturn(thread);
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.setEnvironment(runtimeEnvironment);
            task.startExecution();
            oneShotLatch.await();
            task.cancelExecution();
            oneShotLatch2.trigger();
            thread.join();
            if (atomicReference.get() != null) {
                ExceptionUtils.rethrow((Throwable) atomicReference.get());
            }
            Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager)).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.CANCELED, (Throwable) null);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTaskWithEnvironment() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
            Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            task.setEnvironment(new RuntimeEnvironment((ActorRef) Mockito.mock(ActorRef.class), task, taskDeploymentDescriptor, getClass().getClassLoader(), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), new BroadcastVariableManager(), MockNetworkEnvironment.getMock()));
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.startExecution();
            task.getEnvironment().getExecutingThread().join();
            Assert.assertEquals(ExecutionState.FINISHED, task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager)).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.FINISHED, (Throwable) null);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTaskWithEnvironmentAndException() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
            TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableWithException.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
            Task task = new Task(jobID, jobVertexID, 2, 7, executionAttemptID, "TestTask", taskManager);
            task.setEnvironment(new RuntimeEnvironment((ActorRef) Mockito.mock(ActorRef.class), task, taskDeploymentDescriptor, getClass().getClassLoader(), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), new BroadcastVariableManager(), MockNetworkEnvironment.getMock()));
            Assert.assertEquals(ExecutionState.DEPLOYING, task.getExecutionState());
            task.startExecution();
            task.getEnvironment().getExecutingThread().join();
            Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
            ((TaskManager) Mockito.verify(taskManager)).notifyExecutionStateChange((JobID) Matchers.eq(jobID), (ExecutionAttemptID) Matchers.eq(executionAttemptID), (ExecutionState) Matchers.eq(ExecutionState.FAILED), (Throwable) Matchers.any(Throwable.class));
            ((TaskManager) Mockito.verify(taskManager, Mockito.times(0))).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.CANCELING, (Throwable) null);
            ((TaskManager) Mockito.verify(taskManager, Mockito.times(0))).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.CANCELED, (Throwable) null);
            ((TaskManager) Mockito.verify(taskManager, Mockito.times(0))).notifyExecutionStateChange(jobID, executionAttemptID, ExecutionState.FINISHED, (Throwable) null);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
