package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.class */
public class ExecutionVertexSchedulingTest {
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testSlotReleasedWhenScheduledImmediately() {
        try {
            AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(ActorRef.noSender()).allocateSlot(new JobID());
            allocateSlot.cancel();
            Assert.assertFalse(allocateSlot.isReleased());
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            Scheduler scheduler = (Scheduler) Mockito.mock(Scheduler.class);
            Mockito.when(scheduler.scheduleImmediately((ScheduledUnit) Matchers.any(ScheduledUnit.class))).thenReturn(allocateSlot);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.scheduleForExecution(scheduler, false);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSlot.isReleased());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlotReleasedWhenScheduledQueued() {
        try {
            AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(ActorRef.noSender()).allocateSlot(new JobID());
            allocateSlot.cancel();
            Assert.assertFalse(allocateSlot.isReleased());
            SlotAllocationFuture slotAllocationFuture = new SlotAllocationFuture();
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            Scheduler scheduler = (Scheduler) Mockito.mock(Scheduler.class);
            Mockito.when(scheduler.scheduleQueued((ScheduledUnit) Matchers.any(ScheduledUnit.class))).thenReturn(slotAllocationFuture);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.scheduleForExecution(scheduler, true);
            Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
            slotAllocationFuture.setSlot(allocateSlot);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSlot.isReleased());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testScheduleToRunning() {
        try {
            TestingUtils.setCallingThreadDispatcher(system);
            AllocatedSlot allocateSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]))).allocateSlot(new JobID());
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0]);
            Scheduler scheduler = (Scheduler) Mockito.mock(Scheduler.class);
            Mockito.when(scheduler.scheduleImmediately((ScheduledUnit) Matchers.any(ScheduledUnit.class))).thenReturn(allocateSlot);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.scheduleForExecution(scheduler, false);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } finally {
            TestingUtils.setGlobalExecutionContext();
        }
    }
}
