package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.class */
class SpeculativeSchedulerTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ScheduledExecutorService futureExecutor;
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
    private TestExecutionOperationsDecorator testExecutionOperations;
    private TestBlocklistOperations testBlocklistOperations;
    private TestRestartBackoffTimeStrategy restartStrategy;
    private TestExecutionSlotAllocatorFactory testExecutionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest$TestBlocklistOperations.class */
    public static class TestBlocklistOperations implements BlocklistOperations {
        private final List<BlockedNode> blockedNodes;

        private TestBlocklistOperations() {
            this.blockedNodes = new ArrayList();
        }

        public void addNewBlockedNodes(Collection<BlockedNode> collection) {
            this.blockedNodes.addAll(collection);
        }

        public Set<String> getAllBlockedNodeIds() {
            return (Set) this.blockedNodes.stream().map((v0) -> {
                return v0.getNodeId();
            }).collect(Collectors.toSet());
        }
    }

    SpeculativeSchedulerTest() {
    }

    @BeforeEach
    void setUp() {
        this.futureExecutor = new DirectScheduledExecutorService();
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        this.testExecutionOperations = new TestExecutionOperationsDecorator(new DefaultExecutionOperations());
        this.testBlocklistOperations = new TestBlocklistOperations();
        this.restartStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator();
    }

    @AfterEach
    void tearDown() {
        if (this.futureExecutor != null) {
            ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, new ExecutorService[]{this.futureExecutor});
        }
    }

    @Test
    void testStartScheduling() {
        createSchedulerAndStartScheduling();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(1);
    }

    @Test
    void testNotifySlowTasks() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(1);
        long currentTimeMillis = System.currentTimeMillis();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        Assertions.assertThat(this.testBlocklistOperations.getAllBlockedNodeIds()).containsExactly(new String[]{currentExecutionAttempt.getAssignedResourceLocation().getNodeId()});
        Execution execution = getExecution(onlyExecutionVertex, 1);
        Assertions.assertThat(execution.getState()).isEqualTo(ExecutionState.DEPLOYING);
        Assertions.assertThat(execution.getStateTimestamp(ExecutionState.CREATED)).isGreaterThanOrEqualTo(currentTimeMillis);
    }

    @Test
    void testNotifyDuplicatedSlowTasks() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        createSchedulerAndStartScheduling.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(getExecution(onlyExecutionVertex, 1).getAttemptId()));
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testRestartVertexIfAllSpeculativeExecutionFailed() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        ExecutionAttemptID attemptId = currentExecutionAttempt.getAttemptId();
        ExecutionAttemptID attemptId2 = getExecution(onlyExecutionVertex, 1).getAttemptId();
        createSchedulerAndStartScheduling.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId));
        createSchedulerAndStartScheduling.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(attemptId2));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testNoRestartIfNotAllSpeculativeExecutionFailed() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        createSchedulerAndStartScheduling.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(currentExecutionAttempt.getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
    }

    @Test
    void testRestartVertexIfPartitionExceptionHappened() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Execution execution = getExecution(onlyExecutionVertex, 1);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FAILED, new PartitionNotFoundException(new ResultPartitionID())));
        Assertions.assertThat(execution.getState()).isEqualTo(ExecutionState.CANCELING);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSchedulerAndStartScheduling.getExecutionGraph());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Execution execution = getExecution(onlyExecutionVertex, 1);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
        Assertions.assertThat(execution.getState()).isEqualTo(ExecutionState.CANCELING);
    }

    @Test
    void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        this.testExecutionSlotAllocator.completePendingRequest(currentExecutionAttempt.getAttemptId());
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Execution execution = getExecution(onlyExecutionVertex, 1);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
        Assertions.assertThat(execution.getState()).isEqualTo(ExecutionState.CANCELED);
    }

    @Test
    void testExceptionHistoryIfPartitionExceptionHappened() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FAILED, new PartitionNotFoundException(new ResultPartitionID())));
        ExecutionGraphTestUtils.completeCancellingForAllVertices(createSchedulerAndStartScheduling.getExecutionGraph());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(createSchedulerAndStartScheduling.getExceptionHistory()).hasSize(1);
        Assertions.assertThat(((RootExceptionHistoryEntry) createSchedulerAndStartScheduling.getExceptionHistory().iterator().next()).getFailingTaskName()).isEqualTo(currentExecutionAttempt.getVertexWithAttempt());
    }

    @Test
    void testLocalExecutionAttemptFailureIsCorrectlyRecorded() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        TaskExecutionState createFailedTaskExecutionState = DefaultSchedulerTest.createFailedTaskExecutionState(currentExecutionAttempt.getAttemptId());
        createSchedulerAndStartScheduling.updateTaskExecutionState(createFailedTaskExecutionState);
        ClassLoader classLoader = SpeculativeSchedulerTest.class.getClassLoader();
        Assertions.assertThat(createSchedulerAndStartScheduling.getExecutionGraph().getFailureInfo()).isNotNull();
        Assertions.assertThat(createSchedulerAndStartScheduling.getExecutionGraph().getFailureInfo().getExceptionAsString()).contains(new CharSequence[]{createFailedTaskExecutionState.getError(classLoader).getMessage()});
        Assertions.assertThat(createSchedulerAndStartScheduling.getExceptionHistory()).hasSize(1);
        Assertions.assertThat(((RootExceptionHistoryEntry) createSchedulerAndStartScheduling.getExceptionHistory().iterator().next()).getFailingTaskName()).isEqualTo(currentExecutionAttempt.getVertexWithAttempt());
    }

    @Test
    void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FAILED, new SuppressRestartsException(new Exception("Forced failure for testing."))));
        Assertions.assertThat(createSchedulerAndStartScheduling.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
    }

    @Test
    void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
        this.restartStrategy.setCanRestart(false);
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        createSchedulerAndStartScheduling.updateTaskExecutionState(DefaultSchedulerTest.createFailedTaskExecutionState(currentExecutionAttempt.getAttemptId()));
        Assertions.assertThat(createSchedulerAndStartScheduling.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING);
    }

    @Test
    void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("sink", -1);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph batchJobGraph = JobGraphTestUtils.batchJobGraph(createNoOpVertex, createNoOpVertex2);
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        SpeculativeScheduler buildSpeculativeScheduler = createSchedulerBuilder(batchJobGraph, forMainThread).setVertexParallelismDecider((jobVertexID, list, i) -> {
            return 3;
        }).buildSpeculativeScheduler();
        buildSpeculativeScheduler.getClass();
        forMainThread.execute(buildSpeculativeScheduler::startScheduling);
        DefaultExecutionGraph executionGraph = buildSpeculativeScheduler.getExecutionGraph();
        ExecutionJobVertex jobVertex = executionGraph.getJobVertex(createNoOpVertex.getID());
        ExecutionJobVertex jobVertex2 = executionGraph.getJobVertex(createNoOpVertex2.getID());
        ExecutionVertex executionVertex = jobVertex.getTaskVertices()[0];
        Assertions.assertThat(executionVertex.getCurrentExecutions()).hasSize(1);
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        notifySlowTask(buildSpeculativeScheduler, currentExecutionAttempt);
        Assertions.assertThat(executionVertex.getCurrentExecutions()).hasSize(2);
        Assertions.assertThat(jobVertex2.getParallelism()).isEqualTo(-1);
        buildSpeculativeScheduler.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED, (Throwable) null, (AccumulatorSnapshot) null, new IOMetrics(0L, 0L, 0L, 0L, 0L, 0L, 0L)));
        Assertions.assertThat(jobVertex2.getParallelism()).isEqualTo(3);
        ExecutionVertex executionVertex2 = jobVertex2.getTaskVertices()[0];
        notifySlowTask(buildSpeculativeScheduler, executionVertex2.getCurrentExecutionAttempt());
        Assertions.assertThat(executionVertex2.getCurrentExecutions()).hasSize(2);
    }

    @Test
    void testNumSlowExecutionVerticesMetric() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        Execution currentExecutionAttempt = getOnlyExecutionVertex(createSchedulerAndStartScheduling).getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumSlowExecutionVertices()).isEqualTo(1L);
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumSlowExecutionVertices()).isEqualTo(1L);
        createSchedulerAndStartScheduling.notifySlowTasks(Collections.emptyMap());
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumSlowExecutionVertices()).isZero();
    }

    @Test
    void testEffectiveSpeculativeExecutionsMetric() {
        SpeculativeScheduler createSchedulerAndStartScheduling = createSchedulerAndStartScheduling();
        ExecutionVertex onlyExecutionVertex = getOnlyExecutionVertex(createSchedulerAndStartScheduling);
        Execution currentExecutionAttempt = onlyExecutionVertex.getCurrentExecutionAttempt();
        notifySlowTask(createSchedulerAndStartScheduling, currentExecutionAttempt);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(getExecution(onlyExecutionVertex, 1).getAttemptId(), ExecutionState.FINISHED));
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumEffectiveSpeculativeExecutions()).isEqualTo(1L);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(currentExecutionAttempt.getAttemptId(), ExecutionState.CANCELED));
        createSchedulerAndStartScheduling.handleGlobalFailure(new Exception());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumEffectiveSpeculativeExecutions()).isZero();
        Execution execution = getExecution(onlyExecutionVertex, 2);
        notifySlowTask(createSchedulerAndStartScheduling, execution);
        createSchedulerAndStartScheduling.updateTaskExecutionState(new TaskExecutionState(execution.getAttemptId(), ExecutionState.FINISHED));
        Assertions.assertThat(createSchedulerAndStartScheduling.getNumEffectiveSpeculativeExecutions()).isZero();
    }

    private static Execution getExecution(ExecutionVertex executionVertex, int i) {
        return (Execution) executionVertex.getCurrentExecutions().stream().filter(execution -> {
            return execution.getAttemptNumber() == i;
        }).findFirst().get();
    }

    private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler speculativeScheduler) {
        return (ExecutionVertex) Iterables.getOnlyElement(speculativeScheduler.getExecutionGraph().getAllExecutionVertices());
    }

    private SpeculativeScheduler createSchedulerAndStartScheduling() {
        return createSchedulerAndStartScheduling(DefaultSchedulerTest.singleNonParallelJobVertexJobGraph());
    }

    private SpeculativeScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        try {
            SpeculativeScheduler createScheduler = createScheduler(jobGraph, forMainThread);
            createScheduler.getClass();
            forMainThread.execute(createScheduler::startScheduling);
            return createScheduler;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SpeculativeScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor).buildSpeculativeScheduler();
    }

    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
        Configuration configuration = new Configuration();
        configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, Duration.ofDays(1L));
        return new DefaultSchedulerBuilder(jobGraph, componentMainThreadExecutor, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setBlocklistOperations(this.testBlocklistOperations).setExecutionOperations(this.testExecutionOperations).setFutureExecutor(this.futureExecutor).setDelayExecutor(this.taskRestartExecutor).setRestartBackoffTimeStrategy(this.restartStrategy).setExecutionSlotAllocatorFactory(this.testExecutionSlotAllocatorFactory).setJobMasterConfiguration(configuration);
    }

    private static void notifySlowTask(SpeculativeScheduler speculativeScheduler, Execution execution) {
        speculativeScheduler.notifySlowTasks(ImmutableMap.of(execution.getVertex().getID(), Collections.singleton(execution.getAttemptId())));
    }
}
