package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/FinalizeOnMasterTest.class */
public class FinalizeOnMasterTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testFinalizeIsCalledUponSuccess() throws Exception {
        JobVertex jobVertex = (JobVertex) Mockito.spy(new JobVertex("test vertex 1"));
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(3);
        JobVertex jobVertex2 = (JobVertex) Mockito.spy(new JobVertex("test vertex 2"));
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(2);
        DefaultScheduler createScheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        createScheduler.startScheduling();
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        ExecutionGraphTestUtils.finishAllVertices(executionGraph);
        Assert.assertEquals(JobStatus.FINISHED, executionGraph.waitUntilTerminal());
        ((JobVertex) Mockito.verify(jobVertex, Mockito.times(1))).finalizeOnMaster((JobVertex.InitializeOnMasterContext) Mockito.any(JobVertex.InitializeOnMasterContext.class));
        ((JobVertex) Mockito.verify(jobVertex2, Mockito.times(1))).finalizeOnMaster((JobVertex.InitializeOnMasterContext) Mockito.any(JobVertex.InitializeOnMasterContext.class));
        Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
    }

    @Test
    public void testFinalizeIsNotCalledUponFailure() throws Exception {
        JobVertex jobVertex = (JobVertex) Mockito.spy(new JobVertex("test vertex 1"));
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        DefaultScheduler createScheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        createScheduler.startScheduling();
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new Exception("test"));
        Assert.assertEquals(JobStatus.FAILED, executionGraph.waitUntilTerminal());
        ((JobVertex) Mockito.verify(jobVertex, Mockito.times(0))).finalizeOnMaster((JobVertex.InitializeOnMasterContext) Mockito.any(JobVertex.InitializeOnMasterContext.class));
        Assert.assertEquals(0L, executionGraph.getRegisteredExecutions().size());
    }
}
