package org.apache.flink.runtime.scheduler.slowtaskdetector;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.class */
class ExecutionTimeBasedSlowTaskDetectorTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionTimeBasedSlowTaskDetectorTest() {
    }

    @Test
    void testNoFinishedTaskButRatioIsZero() throws Exception {
        Assertions.assertThat(createSlowTaskDetector(0.0d, 1.0d, 0L).findSlowTasks(createExecutionGraph(ExecutionGraphTestUtils.createNoOpVertex(3)))).hasSize(3);
    }

    @Test
    void testFinishedTaskNotExceedRatio() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph createExecutionGraph = createExecutionGraph(createNoOpVertex);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.5d, 1.0d, 0L);
        createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createExecutionGraph)).isEmpty();
    }

    @Test
    void testFinishedTaskExceedRatio() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph createExecutionGraph = createExecutionGraph(createNoOpVertex);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.3d, 1.0d, 0L);
        createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createExecutionGraph)).hasSize(2);
    }

    @Test
    void testLargeLowerBound() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph createExecutionGraph = createExecutionGraph(createNoOpVertex);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.3d, 1.0d, 2147483647L);
        createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createExecutionGraph)).isEmpty();
    }

    @Test
    void testLargeMultiplier() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph createExecutionGraph = createExecutionGraph(createNoOpVertex);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.3d, 1000000.0d, 0L);
        Thread.sleep(10L);
        createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createExecutionGraph)).isEmpty();
    }

    @Test
    void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        createNoOpVertex2.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createExecutionGraph = createExecutionGraph(createNoOpVertex, createNoOpVertex2);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.3d, 1.0d, 0L);
        createExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        createExecutionGraph.getJobVertex(createNoOpVertex2.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createExecutionGraph)).hasSize(4);
    }

    @Test
    void testFinishedTaskExceedRatioInDynamicGraph() throws Exception {
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex = new JobVertex("vertex2");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.connectNewDataSetAsInput(createNoOpVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph createDynamicExecutionGraph = createDynamicExecutionGraph(createNoOpVertex, jobVertex);
        ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector = createSlowTaskDetector(0.3d, 1.0d, 0L);
        createDynamicExecutionGraph.getJobVertex(createNoOpVertex.getID()).getTaskVertices()[2].getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat(createSlowTaskDetector.findSlowTasks(createDynamicExecutionGraph)).hasSize(2);
    }

    private ExecutionGraph createExecutionGraph(JobVertex... jobVertexArr) throws Exception {
        DefaultScheduler createScheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertexArr), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
        createScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionGraph createDynamicExecutionGraph(JobVertex... jobVertexArr) throws Exception {
        AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertexArr), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).buildAdaptiveBatchJobScheduler();
        ExecutionGraph executionGraph = buildAdaptiveBatchJobScheduler.getExecutionGraph();
        buildAdaptiveBatchJobScheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector(double d, double d2, long j) {
        Configuration configuration = new Configuration();
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, Duration.ofMillis(j));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, Double.valueOf(d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, Double.valueOf(d2));
        return new ExecutionTimeBasedSlowTaskDetector(configuration);
    }
}
