/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.slowtaskdetector;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionTimeBasedSlowTaskDetectorTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionTimeBasedSlowTaskDetectorTest() {
    }

    @Test
    void testNoFinishedTaskButRatioIsZero() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.0, 1.0, 0L);
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(3);
    }

    @Test
    void testFinishedTaskNotExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.5, 1.0, 0L);
        ExecutionVertex ev1 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
        ev1.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testFinishedTaskExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(2);
    }

    @Test
    void testLargeLowerBound() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, Integer.MAX_VALUE);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testLargeMultiplier() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1000000.0, 0L);
        Thread.sleep(10L);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev13 = executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
        ev13.getCurrentExecutionAttempt().markFinished();
        ExecutionVertex ev23 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
        ev23.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(4);
    }

    @Test
    void testFinishedTaskExceedRatioInDynamicGraph() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createDynamicExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev13 = executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
        ev13.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(2);
    }

    private ExecutionGraph createExecutionGraph(JobVertex ... jobVertices) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionGraph createDynamicExecutionGraph(JobVertex ... jobVertices) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        AdaptiveBatchScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).buildAdaptiveBatchJobScheduler();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector(double ratio, double multiplier, long lowerBoundMillis) {
        Configuration configuration = new Configuration();
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, (Object)Duration.ofMillis(lowerBoundMillis));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, (Object)ratio);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, (Object)multiplier);
        return new ExecutionTimeBasedSlowTaskDetector(configuration);
    }
}

