package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.TestFailoverStrategyFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.sort.ExternalSortLargeRecordsITCase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyIterable;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.hamcrest.collection.IsIterableWithSize;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerTest.class */
public class DefaultSchedulerTest extends TestLogger {
    private static final int TIMEOUT_MS = 1000;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutorService;
    private Configuration configuration;
    private TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy;
    private TestExecutionVertexOperationsDecorator testExecutionVertexOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocatorFactory executionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;
    private TestingShuffleMaster shuffleMaster;
    private TestingJobMasterPartitionTracker partitionTracker;
    private Time timeout;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerTest$ReorganizableManuallyTriggeredScheduledExecutor.class */
    private static class ReorganizableManuallyTriggeredScheduledExecutor extends ManuallyTriggeredScheduledExecutor {
        private final List<ScheduledTask<?>> scheduledTasks;

        private ReorganizableManuallyTriggeredScheduledExecutor() {
            this.scheduledTasks = new ArrayList();
        }

        public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return schedule(() -> {
                runnable.run();
                return null;
            }, j, timeUnit);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
            ScheduledTask<?> scheduledTask = new ScheduledTask<>(callable, timeUnit.convert(j, TimeUnit.MILLISECONDS));
            this.scheduledTasks.add(scheduledTask);
            return scheduledTask;
        }

        public List<ScheduledTask<?>> getCollectedScheduledTasks() {
            return this.scheduledTasks;
        }

        public void scheduleCollectedScheduledTasks() {
            for (ScheduledTask<?> scheduledTask : this.scheduledTasks) {
                super.schedule(scheduledTask.getCallable(), scheduledTask.getDelay(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            }
            this.scheduledTasks.clear();
        }

        public void triggerNonPeriodicScheduledTask() {
            scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTask();
        }

        public void triggerNonPeriodicScheduledTasks() {
            scheduleCollectedScheduledTasks();
            super.triggerNonPeriodicScheduledTasks();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.executor = Executors.newSingleThreadExecutor();
        this.scheduledExecutorService = new DirectScheduledExecutorService();
        this.configuration = new Configuration();
        this.testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionVertexOperations = new TestExecutionVertexOperationsDecorator(new DefaultExecutionVertexOperations());
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator();
        this.shuffleMaster = new TestingShuffleMaster();
        this.partitionTracker = new TestingJobMasterPartitionTracker();
        this.timeout = Time.seconds(60L);
    }

    @After
    public void tearDown() throws Exception {
        if (this.scheduledExecutorService != null) {
            ExecutorUtils.gracefulShutdown(1000L, TimeUnit.MILLISECONDS, new ExecutorService[]{this.scheduledExecutorService});
        }
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown(1000L, TimeUnit.MILLISECONDS, new ExecutorService[]{this.executor});
        }
    }

    @Test
    public void startScheduling() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.contains(new ExecutionVertexID[]{new ExecutionVertexID(onlyJobVertex.getID(), 0)}));
    }

    @Test
    public void testCorrectSettingOfInitializationTimestamp() {
        ArchivedExecutionGraph archivedExecutionGraph = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph()).requestJob().getArchivedExecutionGraph();
        MatcherAssert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)), Matchers.greaterThan(0L));
        MatcherAssert.assertThat(Boolean.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)), Is.is(true));
    }

    @Test
    public void deployTasksOnlyWhenAllSlotRequestsAreFulfilled() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(4);
        JobVertexID id = getOnlyJobVertex(singleJobVertexJobGraph).getID();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(singleJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), factory);
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        createScheduler.startScheduling();
        List<ExecutionVertexID> asList = Arrays.asList(new ExecutionVertexID(id, 0), new ExecutionVertexID(id, 1), new ExecutionVertexID(id, 2), new ExecutionVertexID(id, 3));
        lastCreatedSchedulingStrategy.schedule(asList);
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.hasSize(0));
        this.testExecutionSlotAllocator.completePendingRequest(asList.get(0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.hasSize(0));
        this.testExecutionSlotAllocator.completePendingRequests();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.hasSize(4));
    }

    @Test
    public void scheduledVertexOrderFromSchedulingStrategyIsRespected() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(10);
        JobVertexID id = getOnlyJobVertex(singleJobVertexJobGraph).getID();
        List<ExecutionVertexID> asList = Arrays.asList(new ExecutionVertexID(id, 4), new ExecutionVertexID(id, 0), new ExecutionVertexID(id, 3), new ExecutionVertexID(id, 1), new ExecutionVertexID(id, 2));
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        createScheduler(singleJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), factory);
        factory.getLastCreatedSchedulingStrategy().schedule(asList);
        Assert.assertEquals(asList, this.testExecutionVertexOperations.getDeployedVertices());
    }

    @Test
    public void restartAfterDeploymentFails() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        this.testExecutionVertexOperations.enableFailDeploy();
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.testExecutionVertexOperations.disableFailDeploy();
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void restartFailedTask() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void updateTaskExecutionStateReturnsFalseIfExecutionDoesNotExist() {
        Assert.assertFalse(createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph()).updateTaskExecutionState(createFailedTaskExecutionState(new ExecutionAttemptID())));
    }

    @Test
    public void failJobIfCannotRestart() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        waitForTermination(createSchedulerAndStartScheduling);
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.requestJobStatus(), Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
    }

    @Test
    public void failJobIfNotEnoughResources() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.testExecutionSlotAllocator.timeoutPendingRequests();
        waitForTermination(createSchedulerAndStartScheduling);
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        MatcherAssert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
        Throwable deserializeError = createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getFailureInfo().getException().deserializeError(DefaultSchedulerTest.class.getClassLoader());
        Assert.assertTrue(ExceptionUtils.findThrowable(deserializeError, NoResourceAvailableException.class).isPresent());
        Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(deserializeError, "Could not allocate the required slot within slot request timeout.").isPresent());
        MatcherAssert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.FAILED)));
    }

    @Test
    public void restartVerticesOnSlotAllocationTimeout() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        testRestartVerticesOnFailuresInScheduling(executionVertexID -> {
            this.testExecutionSlotAllocator.timeoutPendingRequest(executionVertexID);
        });
    }

    @Test
    public void restartVerticesOnAssignedSlotReleased() throws Exception {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        testRestartVerticesOnFailuresInScheduling(executionVertexID -> {
            this.testExecutionSlotAllocator.completePendingRequest(executionVertexID).releaseSlot(new Exception("Release slot for test"));
        });
    }

    private void testRestartVerticesOnFailuresInScheduling(Consumer<ExecutionVertexID> consumer) throws Exception {
        JobVertex createVertex = createVertex("vertex1", 2);
        JobVertex createVertex2 = createVertex("vertex2", 2);
        createVertex2.connectNewDataSetAsInput(createVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(createVertex, createVertex2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(streamingJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), factory, new RestartPipelinedRegionFailoverStrategy.Factory());
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        createScheduler.startScheduling();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(createVertex.getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(createVertex.getID(), 1);
        ExecutionVertexID executionVertexID3 = new ExecutionVertexID(createVertex2.getID(), 0);
        lastCreatedSchedulingStrategy.schedule(Arrays.asList(executionVertexID, executionVertexID2, executionVertexID3, new ExecutionVertexID(createVertex2.getID(), 1)));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), Matchers.hasSize(4));
        consumer.accept(executionVertexID);
        Iterator it = createScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex2 = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex3 = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex4 = (ArchivedExecutionVertex) it.next();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), Matchers.hasSize(2));
        MatcherAssert.assertThat(archivedExecutionVertex.getExecutionState(), Matchers.is(ExecutionState.FAILED));
        MatcherAssert.assertThat(archivedExecutionVertex3.getExecutionState(), Matchers.is(ExecutionState.CANCELED));
        MatcherAssert.assertThat(archivedExecutionVertex2.getExecutionState(), Matchers.is(ExecutionState.SCHEDULED));
        MatcherAssert.assertThat(archivedExecutionVertex4.getExecutionState(), Matchers.is(ExecutionState.SCHEDULED));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(lastCreatedSchedulingStrategy.getReceivedVerticesToRestart(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{executionVertexID, executionVertexID3}));
    }

    @Test
    public void skipDeploymentIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph nonParallelSourceSinkJobGraph = nonParallelSourceSinkJobGraph();
        List verticesSortedTopologicallyFromSources = nonParallelSourceSinkJobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph);
        this.testExecutionSlotAllocator.completePendingRequest(executionVertexID);
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(((ArchivedExecutionVertex) createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().getAttemptId()));
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        this.testExecutionSlotAllocator.enableAutoCompletePendingRequests();
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.containsInAnyOrder(new ExecutionVertexID[]{executionVertexID, executionVertexID2}));
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getState(), Matchers.is(Matchers.equalTo(JobStatus.RUNNING)));
    }

    @Test
    public void releaseSlotIfVertexVersionOutdated() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        this.executionVertexVersioner.recordModification(executionVertexID);
        this.testExecutionSlotAllocator.completePendingRequests();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), Matchers.hasSize(1));
    }

    @Test
    public void vertexIsResetBeforeRestarted() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(singleNonParallelJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), factory);
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        SchedulingTopology schedulingTopology = lastCreatedSchedulingStrategy.getSchedulingTopology();
        createScheduler.startScheduling();
        SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) Iterables.getOnlyElement(schedulingTopology.getVertices());
        lastCreatedSchedulingStrategy.schedule(Collections.singletonList(schedulingExecutionVertex.getId()));
        createScheduler.updateTaskExecutionState(createFailedTaskExecutionState(((ArchivedExecutionVertex) Iterables.getOnlyElement(createScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(lastCreatedSchedulingStrategy.getReceivedVerticesToRestart(), Matchers.hasSize(1));
        MatcherAssert.assertThat(schedulingExecutionVertex.getState(), Matchers.is(Matchers.equalTo(ExecutionState.CREATED)));
    }

    @Test
    public void scheduleOnlyIfVertexIsCreated() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        TestSchedulingStrategy.Factory factory = new TestSchedulingStrategy.Factory();
        DefaultScheduler createScheduler = createScheduler(singleNonParallelJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), factory);
        TestSchedulingStrategy lastCreatedSchedulingStrategy = factory.getLastCreatedSchedulingStrategy();
        SchedulingTopology schedulingTopology = lastCreatedSchedulingStrategy.getSchedulingTopology();
        createScheduler.startScheduling();
        ExecutionVertexID id = ((SchedulingExecutionVertex) Iterables.getOnlyElement(schedulingTopology.getVertices())).getId();
        lastCreatedSchedulingStrategy.schedule(Collections.singletonList(id));
        try {
            lastCreatedSchedulingStrategy.schedule(Collections.singletonList(id));
            Assert.fail("IllegalStateException should happen");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void handleGlobalFailure() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        createSchedulerAndStartScheduling.handleGlobalFailure(new Exception("forced failure"));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.CANCELED));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void handleGlobalFailureWithLocalFailure() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        JobVertex onlyJobVertex = getOnlyJobVertex(singleJobVertexJobGraph);
        SchedulerTestingUtils.enableCheckpointing(singleJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        List list = (List) StreamSupport.stream(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toList());
        ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) list.get(0);
        createSchedulerAndStartScheduling.handleGlobalFailure(new Exception("global failure"));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, new Exception("local failure")));
        Iterator it = list.iterator();
        while (it.hasNext()) {
            createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState((ExecutionAttemptID) it.next(), ExecutionState.CANCELED));
        }
        this.taskRestartExecutor.triggerScheduledTasks();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(onlyJobVertex.getID(), 1);
        MatcherAssert.assertThat("The execution vertices should be deployed in a specific order reflecting the scheduling start and the global fail-over afterwards.", this.testExecutionVertexOperations.getDeployedVertices(), Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID2, executionVertexID, executionVertexID2}));
    }

    @Test
    public void testStartingCheckpointSchedulerAfterExecutionGraphFinished() {
        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished((v0) -> {
            v0.startCheckpointScheduler();
        });
    }

    @Test
    public void testStoppingCheckpointSchedulerAfterExecutionGraphFinished() {
        assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished((v0) -> {
            v0.stopCheckpointScheduler();
        });
    }

    private void assertCheckpointSchedulingOperationHavingNoEffectAfterJobFinished(Consumer<DefaultScheduler> consumer) {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.notNullValue()));
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(((ExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.getExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED));
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.nullValue()));
        consumer.accept(createSchedulerAndStartScheduling);
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getCheckpointCoordinator(), Matchers.is(Matchers.nullValue()));
    }

    @Test
    public void vertexIsNotAffectedByOutdatedDeployment() {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph);
        Iterator it = createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex2 = (ArchivedExecutionVertex) it.next();
        SchedulingExecutionVertex schedulingExecutionVertex = (SchedulingExecutionVertex) createSchedulerAndStartScheduling.getSchedulingTopology().getVertices().iterator().next();
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(archivedExecutionVertex2.getCurrentExecutionAttempt().getAttemptId()));
        MatcherAssert.assertThat(schedulingExecutionVertex.getState(), Matchers.is(Matchers.equalTo(ExecutionState.SCHEDULED)));
    }

    @Test
    public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        transitionToRunning(createSchedulerAndStartScheduling, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        MatcherAssert.assertThat(Integer.valueOf(checkpointCoordinator.getNumberOfPendingCheckpoints()), Matchers.is(Matchers.equalTo(1)));
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(Integer.valueOf(checkpointCoordinator.getNumberOfPendingCheckpoints()), Matchers.is(Matchers.equalTo(0)));
    }

    @Test
    public void restoreStateWhenRestartingTasks() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        transitionToRunning(createSchedulerAndStartScheduling, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        TestMasterHook fromId = TestMasterHook.fromId("testHook");
        checkpointCoordinator.addMasterHook(fromId);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(createSchedulerAndStartScheduling, ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue());
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(Integer.valueOf(fromId.getRestoreCount()), Matchers.is(Matchers.equalTo(1)));
    }

    @Test
    public void failGlobalWhenRestoringStateFails() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        JobVertex onlyJobVertex = getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        SchedulerTestingUtils.enableCheckpointing(singleNonParallelJobVertexJobGraph);
        CountDownLatch checkpointTriggeredLatch = getCheckpointTriggeredLatch();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        transitionToRunning(createSchedulerAndStartScheduling, attemptId);
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator(createSchedulerAndStartScheduling);
        TestMasterHook fromId = TestMasterHook.fromId("testHook");
        fromId.enableFailOnRestore();
        checkpointCoordinator.addMasterHook(fromId);
        checkpointCoordinator.triggerCheckpoint(false);
        checkpointTriggeredLatch.await();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(createSchedulerAndStartScheduling, ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue());
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
        this.taskRestartExecutor.triggerScheduledTasks();
        List<ExecutionVertexID> deployedVertices = this.testExecutionVertexOperations.getDeployedVertices();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(onlyJobVertex.getID(), 0);
        MatcherAssert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID}));
        fromId.disableFailOnRestore();
        this.taskRestartExecutor.triggerScheduledTasks();
        MatcherAssert.assertThat(deployedVertices, Matchers.contains(new ExecutionVertexID[]{executionVertexID, executionVertexID}));
    }

    @Test
    public void failJobWillIncrementVertexVersions() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.failJob(new FlinkException("Test failure."), System.currentTimeMillis());
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void cancelJobWillIncrementVertexVersions() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.cancel();
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void suspendJobWillIncrementVertexVersions() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        ExecutionVertexID executionVertexID = new ExecutionVertexID(getOnlyJobVertex(singleNonParallelJobVertexJobGraph).getID(), 0);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ExecutionVertexVersion executionVertexVersion = this.executionVertexVersioner.getExecutionVertexVersion(executionVertexID);
        createSchedulerAndStartScheduling.close();
        Assert.assertTrue(this.executionVertexVersioner.isModified(executionVertexVersion));
    }

    @Test
    public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph(2));
        Iterator it = createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED, new RuntimeException("expected")));
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.FAILED, new RuntimeException("expected")));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus requestJobStatus2 = createSchedulerAndStartScheduling.requestJobStatus();
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        JobStatus requestJobStatus3 = createSchedulerAndStartScheduling.requestJobStatus();
        MatcherAssert.assertThat(requestJobStatus, Matchers.equalTo(JobStatus.RESTARTING));
        MatcherAssert.assertThat(requestJobStatus2, Matchers.equalTo(JobStatus.RESTARTING));
        MatcherAssert.assertThat(requestJobStatus3, Matchers.equalTo(JobStatus.RUNNING));
    }

    @Test
    public void cancelWhileRestartingShouldWaitForRunningTasks() {
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleJobVertexJobGraph(2));
        SchedulingTopology schedulingTopology = createSchedulerAndStartScheduling.getSchedulingTopology();
        Iterator it = createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptId2 = ((ArchivedExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId();
        ExecutionVertexID executionVertexIdOrThrow = createSchedulerAndStartScheduling.getExecutionVertexIdOrThrow(attemptId2);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FAILED, new RuntimeException("expected")));
        createSchedulerAndStartScheduling.cancel();
        ExecutionState state = schedulingTopology.getVertex(executionVertexIdOrThrow).getState();
        JobStatus requestJobStatus = createSchedulerAndStartScheduling.requestJobStatus();
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(attemptId2, ExecutionState.CANCELED, new RuntimeException("expected")));
        MatcherAssert.assertThat(state, Matchers.is(Matchers.equalTo(ExecutionState.CANCELING)));
        MatcherAssert.assertThat(requestJobStatus, Matchers.is(Matchers.equalTo(JobStatus.CANCELLING)));
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.requestJobStatus(), Matchers.is(Matchers.equalTo(JobStatus.CANCELED)));
    }

    @Test
    public void failureInfoIsSetAfterTaskFailure() {
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, new RuntimeException("expected exception")));
        ErrorInfo failureInfo = createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getFailureInfo();
        MatcherAssert.assertThat(failureInfo, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(failureInfo.getExceptionAsString(), Matchers.containsString("expected exception"));
    }

    @Test
    public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        DefaultScheduler createScheduler = createScheduler(singleJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), new PipelinedRegionSchedulingStrategy.Factory(), new RestartAllFailoverStrategy.Factory());
        createScheduler.startScheduling();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) createScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next();
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), Matchers.hasSize(2));
        createScheduler.updateTaskExecutionState(new TaskExecutionState(archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, new RuntimeException("expected exception")));
        Iterator it = createScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator();
        ArchivedExecutionVertex archivedExecutionVertex2 = (ArchivedExecutionVertex) it.next();
        ArchivedExecutionVertex archivedExecutionVertex3 = (ArchivedExecutionVertex) it.next();
        MatcherAssert.assertThat(archivedExecutionVertex2.getExecutionState(), Matchers.is(ExecutionState.FAILED));
        MatcherAssert.assertThat(archivedExecutionVertex3.getExecutionState(), Matchers.is(ExecutionState.CANCELED));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), Matchers.hasSize(0));
    }

    @Test
    public void pendingSlotRequestsOfVerticesToRestartWillNotBeFulfilledByReturnedSlots() throws Exception {
        JobGraph sourceSinkJobGraph = sourceSinkJobGraph(10);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.testExecutionSlotAllocator.enableCompletePendingRequestsWithReturnedSlots();
        DefaultScheduler createScheduler = createScheduler(sourceSinkJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), new PipelinedRegionSchedulingStrategy.Factory(), new RestartAllFailoverStrategy.Factory());
        createScheduler.startScheduling();
        ExecutionVertex executionVertex = (ExecutionVertex) Iterables.get(createScheduler.getExecutionGraph().getAllExecutionVertices(), 0);
        Set set = (Set) this.testExecutionSlotAllocator.getPendingRequests().values().stream().map((v0) -> {
            return v0.getLogicalSlotFuture();
        }).collect(Collectors.toSet());
        MatcherAssert.assertThat(set, Matchers.hasSize(20));
        this.testExecutionSlotAllocator.completePendingRequest(executionVertex.getID());
        MatcherAssert.assertThat(Long.valueOf(set.stream().filter((v0) -> {
            return v0.isDone();
        }).count()), Matchers.is(1L));
        createScheduler.updateTaskExecutionState(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, new RuntimeException("expected exception")));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getPendingRequests().keySet(), Matchers.hasSize(0));
        MatcherAssert.assertThat(this.testExecutionSlotAllocator.getReturnedSlots(), Matchers.hasSize(2));
        MatcherAssert.assertThat(Long.valueOf(set.stream().filter((v0) -> {
            return v0.isCancelled();
        }).count()), Matchers.is(18L));
    }

    @Test
    public void testExceptionHistoryWithGlobalFailOver() {
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        Exception exc = new Exception("Expected exception");
        createSchedulerAndStartScheduling.handleGlobalFailure(exc);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED, exc));
        this.taskRestartExecutor.triggerScheduledTasks();
        Iterable exceptionHistory = createSchedulerAndStartScheduling.getExceptionHistory();
        MatcherAssert.assertThat(exceptionHistory, IsIterableWithSize.iterableWithSize(1));
        RootExceptionHistoryEntry rootExceptionHistoryEntry = (RootExceptionHistoryEntry) exceptionHistory.iterator().next();
        MatcherAssert.assertThat(rootExceptionHistoryEntry, ExceptionHistoryEntryMatcher.matchesGlobalFailure(exc, createSchedulerAndStartScheduling.getExecutionGraph().getFailureInfo().getTimestamp()));
        MatcherAssert.assertThat(rootExceptionHistoryEntry.getConcurrentExceptions(), IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExceptionHistoryWithRestartableFailure() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder testingLogicalSlotBuilder = new TestingLogicalSlotBuilder();
        testingLogicalSlotBuilder.setTaskManagerLocation(localTaskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(testingLogicalSlotBuilder);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException runtimeException = new RuntimeException("restartable exception");
        long initiateFailure = initiateFailure(createSchedulerAndStartScheduling, archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), runtimeException);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        this.testRestartBackoffTimeStrategy.setCanRestart(false);
        ExecutionAttemptID attemptId = ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId();
        RuntimeException runtimeException2 = new RuntimeException("failing exception");
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getExceptionHistory(), IsIterableContainingInOrder.contains(new Matcher[]{ExceptionHistoryEntryMatcher.matchesFailure(runtimeException, initiateFailure, archivedExecutionVertex.getTaskNameWithSubtaskIndex(), archivedExecutionVertex.getCurrentAssignedResourceLocation()), ExceptionHistoryEntryMatcher.matchesGlobalFailure(runtimeException2, initiateFailure(createSchedulerAndStartScheduling, attemptId, runtimeException2))}));
    }

    @Test
    public void testExceptionHistoryWithPreDeployFailure() {
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().disableAutoCompletePendingRequests();
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph());
        this.executionSlotAllocatorFactory.getTestExecutionSlotAllocator().timeoutPendingRequests();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        this.taskRestartExecutor.triggerNonPeriodicScheduledTask();
        MatcherAssert.assertThat(archivedExecutionVertex.getCurrentAssignedResourceLocation(), Matchers.is(Matchers.nullValue()));
        ErrorInfo errorInfo = (ErrorInfo) archivedExecutionVertex.getFailureInfo().orElseThrow(() -> {
            return new AssertionError("A failureInfo should be set.");
        });
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getExceptionHistory(), IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure(errorInfo.getException(), errorInfo.getTimestamp(), archivedExecutionVertex.getTaskNameWithSubtaskIndex(), archivedExecutionVertex.getCurrentAssignedResourceLocation())));
    }

    @Test
    public void testExceptionHistoryConcurrentRestart() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TestingLogicalSlotBuilder testingLogicalSlotBuilder = new TestingLogicalSlotBuilder();
        testingLogicalSlotBuilder.setTaskManagerLocation(localTaskManagerLocation);
        this.executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(testingLogicalSlotBuilder);
        ScheduledExecutor reorganizableManuallyTriggeredScheduledExecutor = new ReorganizableManuallyTriggeredScheduledExecutor();
        TestFailoverStrategyFactory testFailoverStrategyFactory = new TestFailoverStrategyFactory();
        DefaultScheduler createScheduler = createScheduler(singleJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), new PipelinedRegionSchedulingStrategy.Factory(), testFailoverStrategyFactory, reorganizableManuallyTriggeredScheduledExecutor);
        createScheduler.startScheduling();
        ExecutionVertex executionVertex = (ExecutionVertex) Iterables.get(createScheduler.getExecutionGraph().getAllExecutionVertices(), 0);
        ExecutionVertex executionVertex2 = (ExecutionVertex) Iterables.get(createScheduler.getExecutionGraph().getAllExecutionVertices(), 1);
        RuntimeException runtimeException = new RuntimeException("failure #0");
        testFailoverStrategyFactory.setTasksToRestart(executionVertex.getID());
        long initiateFailure = initiateFailure(createScheduler, executionVertex.getCurrentExecutionAttempt().getAttemptId(), runtimeException);
        RuntimeException runtimeException2 = new RuntimeException("failure #1");
        testFailoverStrategyFactory.setTasksToRestart(executionVertex2.getID(), executionVertex.getID());
        long initiateFailure2 = initiateFailure(createScheduler, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), runtimeException2);
        Collections.reverse(reorganizableManuallyTriggeredScheduledExecutor.getCollectedScheduledTasks());
        reorganizableManuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
        MatcherAssert.assertThat(createScheduler.getExceptionHistory(), IsIterableWithSize.iterableWithSize(2));
        Iterator it = createScheduler.getExceptionHistory().iterator();
        RootExceptionHistoryEntry rootExceptionHistoryEntry = (RootExceptionHistoryEntry) it.next();
        MatcherAssert.assertThat(rootExceptionHistoryEntry, Matchers.is(ExceptionHistoryEntryMatcher.matchesFailure(runtimeException, initiateFailure, executionVertex.getTaskNameWithSubtaskIndex(), executionVertex.getCurrentAssignedResourceLocation())));
        MatcherAssert.assertThat(rootExceptionHistoryEntry.getConcurrentExceptions(), IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure(runtimeException2, initiateFailure2, executionVertex2.getTaskNameWithSubtaskIndex(), executionVertex2.getCurrentAssignedResourceLocation())));
        RootExceptionHistoryEntry rootExceptionHistoryEntry2 = (RootExceptionHistoryEntry) it.next();
        MatcherAssert.assertThat(rootExceptionHistoryEntry2, Matchers.is(ExceptionHistoryEntryMatcher.matchesFailure(runtimeException2, initiateFailure2, executionVertex2.getTaskNameWithSubtaskIndex(), executionVertex2.getCurrentAssignedResourceLocation())));
        MatcherAssert.assertThat(rootExceptionHistoryEntry2.getConcurrentExceptions(), IsEmptyIterable.emptyIterable());
    }

    @Test
    public void testExceptionHistoryTruncation() {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.configuration.set(WebOptions.MAX_EXCEPTION_HISTORY_SIZE, 1);
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph);
        initiateFailure(createSchedulerAndStartScheduling, ((ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices())).getCurrentExecutionAttempt().getAttemptId(), new RuntimeException("old exception"));
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        ArchivedExecutionVertex archivedExecutionVertex = (ArchivedExecutionVertex) Iterables.getOnlyElement(createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        RuntimeException runtimeException = new RuntimeException("relevant exception");
        long initiateFailure = initiateFailure(createSchedulerAndStartScheduling, archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), runtimeException);
        this.taskRestartExecutor.triggerNonPeriodicScheduledTasks();
        MatcherAssert.assertThat(createSchedulerAndStartScheduling.getExceptionHistory(), IsIterableContainingInOrder.contains(ExceptionHistoryEntryMatcher.matchesFailure(runtimeException, initiateFailure, archivedExecutionVertex.getTaskNameWithSubtaskIndex(), archivedExecutionVertex.getCurrentAssignedResourceLocation())));
    }

    @Test
    public void testStatusMetrics() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingMetricRegistry build = TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -225943176:
                    if (str.equals("runningTimeTotal")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                    completableFuture.complete((Gauge) metric);
                    return;
                default:
                    return;
            }
        }).build();
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        getOnlyJobVertex(singleNonParallelJobVertexJobGraph);
        Configuration configuration = new Configuration();
        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
        ComponentMainThreadExecutor forSingleThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        Time milliseconds = Time.milliseconds(5L);
        DeclarativeSlotPoolBridge buildAndStart = new DeclarativeSlotPoolBridgeBuilder().setBatchSlotTimeout(milliseconds).buildAndStart(forSingleThreadExecutor);
        DefaultScheduler mo476build = createSchedulerBuilder(singleNonParallelJobVertexJobGraph, forSingleThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup(build, "localhost").addJob(new JobID(), "jobName")).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), buildAndStart), milliseconds)).mo476build();
        AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1);
        submissionBufferingTaskManagerGateway.setCancelConsumer(executionAttemptID -> {
            forSingleThreadExecutor.execute(() -> {
                mo476build.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED));
            });
        });
        forSingleThreadExecutor.execute(() -> {
            mo476build.startScheduling();
            SlotPoolTestUtils.offerSlots(buildAndStart, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Assert.assertThat(((Gauge) completableFuture.get()).getValue(), Matchers.greaterThan(0L));
    }

    @Test
    public void testDeploymentWaitForProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList arrayList = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> {
            arrayList.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID());
        });
        createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph());
        MatcherAssert.assertThat(arrayList, Matchers.hasSize(0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.hasSize(0));
        this.shuffleMaster.completeAllPendingRegistrations();
        MatcherAssert.assertThat(arrayList, Matchers.hasSize(1));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getDeployedVertices(), Matchers.hasSize(2));
    }

    @Test
    public void testFailedProducedPartitionRegistration() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph());
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), Matchers.hasSize(0));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), Matchers.hasSize(0));
        this.shuffleMaster.failAllPendingRegistrations();
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), Matchers.hasSize(2));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), Matchers.hasSize(1));
    }

    @Test
    public void testDirectExceptionOnProducedPartitionRegistration() {
        this.shuffleMaster.setThrowExceptionalOnRegistration(true);
        createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph());
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getCanceledVertices(), Matchers.hasSize(2));
        MatcherAssert.assertThat(this.testExecutionVertexOperations.getFailedVertices(), Matchers.hasSize(1));
    }

    @Test
    public void testProducedPartitionRegistrationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = null;
        try {
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            ComponentMainThreadExecutor forSingleThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.shuffleMaster.setAutoCompleteRegistration(false);
            JobGraph nonParallelSourceSinkJobGraph = nonParallelSourceSinkJobGraph();
            this.timeout = Time.milliseconds(1L);
            createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph, forSingleThreadExecutor);
            this.testExecutionVertexOperations.awaitCanceledVertices(2);
            this.testExecutionVertexOperations.awaitFailedVertices(1);
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
        } catch (Throwable th) {
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testLateRegisteredPartitionsWillBeReleased() {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList arrayList = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> {
            arrayList.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID());
        });
        DefaultScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling(nonParallelSourceSinkJobGraph());
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState(((ArchivedExecutionVertex) createSchedulerAndStartScheduling.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().getAttemptId()));
        this.shuffleMaster.completeAllPendingRegistrations();
        MatcherAssert.assertThat(arrayList, Matchers.hasSize(0));
        MatcherAssert.assertThat(this.shuffleMaster.getExternallyReleasedPartitions(), Matchers.hasSize(1));
    }

    @Test
    public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointsCleaner) -> {
                JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(1);
                SchedulerTestingUtils.enableCheckpointing(singleJobVertexJobGraph);
                try {
                    return SchedulerTestingUtils.newSchedulerBuilder(singleJobVertexJobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(newSingleThreadScheduledExecutor)).setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointsCleaner).mo476build();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, newSingleThreadScheduledExecutor, this.log);
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }

    public static void doTestCheckpointCleanerIsClosedAfterCheckpointServices(BiFunction<CheckpointRecoveryFactory, CheckpointsCleaner, SchedulerNG> biFunction, ScheduledExecutorService scheduledExecutorService, final Logger logger) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1) { // from class: org.apache.flink.runtime.scheduler.DefaultSchedulerTest.1
            public void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) throws Exception {
                countDownLatch.await();
                super.shutdown(jobStatus, checkpointsCleaner);
            }
        };
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter() { // from class: org.apache.flink.runtime.scheduler.DefaultSchedulerTest.2
            public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    logger.error("An error occurred while executing waiting for the CheckpointServices shutdown.", e);
                    Thread.currentThread().interrupt();
                }
                return super.shutdown(jobStatus);
            }
        };
        SchedulerNG apply = biFunction.apply(new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, standaloneCheckpointIDCounter), new CheckpointsCleaner() { // from class: org.apache.flink.runtime.scheduler.DefaultSchedulerTest.3
            public synchronized CompletableFuture<Void> closeAsync() {
                countDownLatch2.countDown();
                return super.closeAsync();
            }
        });
        CompletableFuture completableFuture = new CompletableFuture();
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        scheduledExecutorService.submit(() -> {
            apply.closeAsync().thenRun(() -> {
                completableFuture.complete(null);
            });
            countDownLatch3.countDown();
        });
        countDownLatch3.await();
        Assert.assertFalse("CheckpointCleaner should not close before checkpoint services.", countDownLatch2.await(10L, TimeUnit.MILLISECONDS));
        countDownLatch.countDown();
        countDownLatch2.await();
        completableFuture.get();
    }

    private static TaskExecutionState createFailedTaskExecutionState(ExecutionAttemptID executionAttemptID) {
        return new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause"));
    }

    private static long initiateFailure(DefaultScheduler defaultScheduler, ExecutionAttemptID executionAttemptID, Throwable th) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, th));
        return getFailureTimestamp(defaultScheduler, executionAttemptID);
    }

    private static long getFailureTimestamp(DefaultScheduler defaultScheduler, ExecutionAttemptID executionAttemptID) {
        return ((Long) ((ExecutionVertex) StreamSupport.stream(defaultScheduler.getExecutionGraph().getAllExecutionVertices().spliterator(), false).filter(executionVertex -> {
            return executionAttemptID.equals(executionVertex.getCurrentExecutionAttempt().getAttemptId());
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("No ExecutionVertex available for the passed ExecutionAttemptId " + executionAttemptID);
        })).getFailureInfo().map((v0) -> {
            return v0.getTimestamp();
        }).orElseThrow(() -> {
            return new IllegalStateException("No failure was set for ExecutionVertex having the passed execution " + executionAttemptID);
        })).longValue();
    }

    private static JobVertex createVertex(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private void waitForTermination(DefaultScheduler defaultScheduler) throws Exception {
        defaultScheduler.getJobTerminationFuture().get(1000L, TimeUnit.MILLISECONDS);
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return JobGraphTestUtils.streamingJobGraph(jobVertex);
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }

    private static JobGraph sourceSinkJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setParallelism(i);
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }

    private static JobVertex getOnlyJobVertex(JobGraph jobGraph) {
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        Preconditions.checkState(verticesSortedTopologicallyFromSources.size() == 1);
        return (JobVertex) verticesSortedTopologicallyFromSources.get(0);
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        return createSchedulerAndStartScheduling(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private DefaultScheduler createSchedulerAndStartScheduling(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
        try {
            DefaultScheduler mo476build = createSchedulerBuilder(jobGraph, componentMainThreadExecutor).mo476build();
            mo476build.getClass();
            componentMainThreadExecutor.execute(mo476build::startScheduling);
            return mo476build;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).mo476build();
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory) throws Exception {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setFailoverStrategyFactory(factory).mo476build();
    }

    private DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, ScheduledExecutor scheduledExecutor) throws Exception {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor).setDelayExecutor(scheduledExecutor).setSchedulingStrategyFactory(schedulingStrategyFactory).setFailoverStrategyFactory(factory).mo476build();
    }

    private SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, componentMainThreadExecutor).setLogger(this.log).setIoExecutor(this.executor).setJobMasterConfiguration(this.configuration).setFutureExecutor(this.scheduledExecutorService).setDelayExecutor(this.taskRestartExecutor).setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory()).setFailoverStrategyFactory(new RestartPipelinedRegionFailoverStrategy.Factory()).setRestartBackoffTimeStrategy(this.testRestartBackoffTimeStrategy).setExecutionVertexOperations(this.testExecutionVertexOperations).setExecutionVertexVersioner(this.executionVertexVersioner).setExecutionSlotAllocatorFactory(this.executionSlotAllocatorFactory).setShuffleMaster(this.shuffleMaster).setPartitionTracker(this.partitionTracker).setRpcTimeout(this.timeout);
    }

    private CountDownLatch getCheckpointTriggeredLatch() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        this.testExecutionSlotAllocator.getLogicalSlotBuilder().setTaskManagerGateway(simpleAckingTaskManagerGateway);
        simpleAckingTaskManagerGateway.setCheckpointConsumer((executionAttemptID, jobID, j, j2, checkpointOptions) -> {
            countDownLatch.countDown();
        });
        return countDownLatch;
    }

    private void transitionToRunning(DefaultScheduler defaultScheduler, ExecutionAttemptID executionAttemptID) {
        Preconditions.checkState(defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.INITIALIZING)));
        Preconditions.checkState(defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING)));
    }
}
