package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import java.io.IOException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.class */
public class ExecutionVertexCancelTest {
    private static ActorSystem system;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest$CancelSequenceTaskManager.class */
    public static class CancelSequenceTaskManager extends UntypedActor {
        private final TaskManagerMessages.TaskOperationResult[] results;
        private int index = -1;

        public CancelSequenceTaskManager(TaskManagerMessages.TaskOperationResult[] taskOperationResultArr) {
            this.results = taskOperationResultArr;
        }

        public void onReceive(Object obj) throws Exception {
            if (obj instanceof TaskManagerMessages.SubmitTask) {
                getSender().tell(new TaskManagerMessages.TaskOperationResult(((TaskManagerMessages.SubmitTask) obj).tasks().getExecutionId(), true), getSelf());
            } else if (obj instanceof TaskManagerMessages.CancelTask) {
                this.index++;
                if (this.index >= this.results.length) {
                    getSender().tell(new Status.Failure(new IOException("RPC call failed.")), getSelf());
                } else {
                    getSender().tell(this.results[this.index], getSelf());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest$CancelSequenceTaskManagerCreator.class */
    public static class CancelSequenceTaskManagerCreator implements Creator<CancelSequenceTaskManager> {
        private final TaskManagerMessages.TaskOperationResult[] results;

        public CancelSequenceTaskManagerCreator(TaskManagerMessages.TaskOperationResult... taskOperationResultArr) {
            this.results = taskOperationResultArr;
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public CancelSequenceTaskManager m8create() throws Exception {
            return new CancelSequenceTaskManager(this.results);
        }
    }

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testCancelFromCreated() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelFromScheduled() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
            Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.1
            {
                try {
                    JobVertexID jobVertexID = new JobVertexID();
                    ExecutionGraphTestUtils.ActionQueue actionQueue = new ExecutionGraphTestUtils.ActionQueue();
                    TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(actionQueue));
                    ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(jobVertexID), 0, new IntermediateResult[0]);
                    ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
                    ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
                    Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
                    AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(attemptId, true), new TaskManagerMessages.TaskOperationResult(attemptId, false))))).allocateSlot(new JobID());
                    executionVertex.deployToSlot(allocateSlot);
                    Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
                    executionVertex.cancel();
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    actionQueue.triggerNextAction();
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    actionQueue.triggerNextAction();
                    executionVertex.getCurrentExecutionAttempt().cancelingComplete();
                    Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
                    actionQueue.triggerNextAction();
                    Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
                    Assert.assertTrue(allocateSlot.isReleased());
                    Assert.assertNull(executionVertex.getFailureCause());
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                } finally {
                    TestingUtils.setGlobalExecutionContext();
                }
            }
        };
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.2
            {
                try {
                    JobVertexID jobVertexID = new JobVertexID();
                    ExecutionGraphTestUtils.ActionQueue actionQueue = new ExecutionGraphTestUtils.ActionQueue();
                    TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(actionQueue));
                    ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(jobVertexID), 0, new IntermediateResult[0]);
                    ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
                    ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
                    Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
                    AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(attemptId, false), new TaskManagerMessages.TaskOperationResult(attemptId, true))))).allocateSlot(new JobID());
                    executionVertex.deployToSlot(allocateSlot);
                    Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
                    executionVertex.cancel();
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    Runnable popNextAction = actionQueue.popNextAction();
                    actionQueue.popNextAction().run();
                    actionQueue.triggerNextAction();
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    popNextAction.run();
                    Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                    actionQueue.triggerNextAction();
                    actionQueue.triggerNextAction();
                    executionVertex.getCurrentExecutionAttempt().cancelingComplete();
                    Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
                    Assert.assertTrue(allocateSlot.isReleased());
                    Assert.assertNull(executionVertex.getFailureCause());
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                } finally {
                    TestingUtils.setGlobalExecutionContext();
                }
            }
        };
    }

    @Test
    public void testCancelFromRunning() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.3
            {
                try {
                    try {
                        TestingUtils.setCallingThreadDispatcher(ExecutionVertexCancelTest.system);
                        ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
                        AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))))).allocateSlot(new JobID());
                        ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
                        ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSlot);
                        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
                        executionVertex.cancel();
                        executionVertex.getCurrentExecutionAttempt().cancelingComplete();
                        Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
                        Assert.assertTrue(allocateSlot.isReleased());
                        Assert.assertNull(executionVertex.getFailureCause());
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
                        TestingUtils.setGlobalExecutionContext();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.setGlobalExecutionContext();
                    }
                } catch (Throwable th) {
                    TestingUtils.setGlobalExecutionContext();
                    throw th;
                }
            }
        };
    }

    @Test
    public void testRepeatedCancelFromRunning() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.4
            {
                try {
                    try {
                        TestingUtils.setCallingThreadDispatcher(ExecutionVertexCancelTest.system);
                        ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
                        AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))))).allocateSlot(new JobID());
                        ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
                        ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSlot);
                        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
                        executionVertex.cancel();
                        Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                        executionVertex.cancel();
                        Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                        executionVertex.getCurrentExecutionAttempt().cancelingComplete();
                        Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
                        Assert.assertTrue(allocateSlot.isReleased());
                        Assert.assertNull(executionVertex.getFailureCause());
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
                        TestingUtils.setGlobalExecutionContext();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.setGlobalExecutionContext();
                    }
                } catch (Throwable th) {
                    TestingUtils.setGlobalExecutionContext();
                    throw th;
                }
            }
        };
    }

    @Test
    public void testCancelFromRunningDidNotFindTask() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.5
            {
                try {
                    try {
                        TestingUtils.setCallingThreadDispatcher(ExecutionVertexCancelTest.system);
                        ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
                        AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), false))))).allocateSlot(new JobID());
                        ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
                        ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSlot);
                        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
                        executionVertex.cancel();
                        Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
                        Assert.assertNull(executionVertex.getFailureCause());
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                        Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                        TestingUtils.setGlobalExecutionContext();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.setGlobalExecutionContext();
                    }
                } catch (Throwable th) {
                    TestingUtils.setGlobalExecutionContext();
                    throw th;
                }
            }
        };
    }

    @Test
    public void testCancelCallFails() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.6
            {
                try {
                    TestingUtils.setCallingThreadDispatcher(ExecutionVertexCancelTest.system);
                    ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
                    AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(ExecutionVertexCancelTest.system, Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult[0])))).allocateSlot(new JobID());
                    ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
                    ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSlot);
                    Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
                    executionVertex.cancel();
                    Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
                    Assert.assertTrue(allocateSlot.isReleased());
                    Assert.assertNotNull(executionVertex.getFailureCause());
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
                    Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                } finally {
                    TestingUtils.setGlobalExecutionContext();
                }
            }
        };
    }

    @Test
    public void testSendCancelAndReceiveFail() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest.7
            {
                try {
                    ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
                    AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(ExecutionVertexCancelTest.system.actorOf(Props.create(new CancelSequenceTaskManagerCreator(new TaskManagerMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))))).allocateSlot(new JobID());
                    ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
                    ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSlot);
                    Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
                    executionVertex.cancel();
                    Assert.assertTrue(executionVertex.getExecutionState() == ExecutionState.CANCELING || executionVertex.getExecutionState() == ExecutionState.FAILED);
                    executionVertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
                    Assert.assertTrue(executionVertex.getExecutionState() == ExecutionState.CANCELED || executionVertex.getExecutionState() == ExecutionState.FAILED);
                    Assert.assertTrue(allocateSlot.isReleased());
                    Assert.assertEquals(0L, executionVertex.getExecutionGraph().getRegisteredExecutions().size());
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testScheduleOrDeployAfterCancel() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.CANCELED);
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            executionVertex.scheduleForExecution((Scheduler) Mockito.mock(Scheduler.class), false);
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(ExecutionGraphTestUtils.getInstance(ActorRef.noSender()).allocateSlot(new JobID()));
                Assert.fail("Method should throw an exception");
            } catch (IllegalStateException e) {
                Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testActionsWhileCancelling() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            try {
                ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0]);
                ExecutionGraphTestUtils.setVertexState(executionVertex2, ExecutionState.CANCELING);
                executionVertex2.scheduleForExecution((Scheduler) Mockito.mock(Scheduler.class), false);
            } catch (Exception e) {
                Assert.fail("should not throw an exception");
            }
            try {
                ExecutionVertex executionVertex3 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0]);
                ExecutionGraphTestUtils.setVertexState(executionVertex3, ExecutionState.CANCELING);
                executionVertex3.deployToSlot(ExecutionGraphTestUtils.getInstance(ActorRef.noSender()).allocateSlot(new JobID()));
                Assert.fail("Method should throw an exception");
            } catch (IllegalStateException e2) {
            }
            ExecutionVertex executionVertex4 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0]);
            AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(ActorRef.noSender()).allocateSlot(new JobID());
            ExecutionGraphTestUtils.setVertexResource(executionVertex4, allocateSlot);
            ExecutionGraphTestUtils.setVertexState(executionVertex4, ExecutionState.CANCELING);
            Exception exc = new Exception("test exception");
            executionVertex4.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex4.getExecutionState());
            Assert.assertEquals(exc, executionVertex4.getFailureCause());
            Assert.assertTrue(allocateSlot.isReleased());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }
}
