package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.class */
class PipelinedRegionSchedulingStrategyTest {

    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map1;
    private List<TestingSchedulingExecutionVertex> map2;
    private List<TestingSchedulingExecutionVertex> map3;
    private List<TestingSchedulingExecutionVertex> sink;

    PipelinedRegionSchedulingStrategyTest() {
    }

    @BeforeEach
    void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map1 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map2 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map3 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map1).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).finish();
        this.testingSchedulingTopology.connectPointwise(this.map1, this.map2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        this.testingSchedulingTopology.connectPointwise(this.map2, this.map3).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_SELECTIVE).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map3, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    void testStartScheduling() {
        startScheduling(this.testingSchedulingTopology);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0), this.map1.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1), this.map1.get(1)));
        arrayList.add(Arrays.asList(this.map2.get(0)));
        arrayList.add(Arrays.asList(this.map2.get(1)));
        arrayList.add(Arrays.asList(this.map3.get(0)));
        arrayList.add(Arrays.asList(this.map3.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    void testRestartTasks() {
        startScheduling(this.testingSchedulingTopology).restartTasks((Set) Stream.of((Object[]) new List[]{this.source, this.map1, this.map2, this.map3, this.sink}).flatMap((v0) -> {
            return v0.stream();
        }).map((v0) -> {
            return v0.m567getId();
        }).collect(Collectors.toSet()));
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.source.get(0), this.map1.get(0)));
        arrayList.add(Arrays.asList(this.source.get(1), this.map1.get(1)));
        arrayList.add(Arrays.asList(this.map2.get(0)));
        arrayList.add(Arrays.asList(this.map2.get(1)));
        arrayList.add(Arrays.asList(this.map3.get(0)));
        arrayList.add(Arrays.asList(this.map3.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    void testNotifyingBlockingResultPartitionProducerFinished() {
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(this.testingSchedulingTopology);
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex = this.map3.get(0);
        testingSchedulingExecutionVertex.getProducedResults().iterator().next().markFinished();
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex.m567getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(6);
        TestingSchedulingExecutionVertex testingSchedulingExecutionVertex2 = this.map3.get(1);
        testingSchedulingExecutionVertex2.getProducedResults().iterator().next().markFinished();
        startScheduling.onExecutionStateChange(testingSchedulingExecutionVertex2.m567getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(8);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(this.sink.get(0)));
        arrayList.add(Arrays.asList(this.sink.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    void testSchedulingTopologyWithPersistentBlockingEdges() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> finish = testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish();
        testingSchedulingTopology.connectPointwise(finish, testingSchedulingTopology.addExecutionVertices().withParallelism(1).finish()).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).finish();
        startScheduling(testingSchedulingTopology);
        ArrayList arrayList = new ArrayList();
        arrayList.add(Arrays.asList(finish.get(0)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(arrayList, this.testingSchedulerOperation);
    }

    @Test
    void testComputingCrossRegionConsumedPartitionGroupsCorrectly() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 3);
        JobVertex createJobVertex3 = createJobVertex("v3", 2);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Set crossRegionConsumedPartitionGroups = new PipelinedRegionSchedulingStrategy(this.testingSchedulerOperation, build.getSchedulingTopology()).getCrossRegionConsumedPartitionGroups();
        Assertions.assertThat(crossRegionConsumedPartitionGroups).hasSize(1);
        Assertions.assertThat(crossRegionConsumedPartitionGroups).contains(new ConsumedPartitionGroup[]{(ConsumedPartitionGroup) build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[1].getAllConsumedPartitionGroups().get(0)});
    }

    @Test
    void testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        testingSchedulingTopology.connectAllToAll(testingSchedulingTopology.addExecutionVertices().withParallelism(4).finish(), testingSchedulingTopology.addExecutionVertices().withParallelism(4).finish()).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        Assertions.assertThat(new PipelinedRegionSchedulingStrategy(this.testingSchedulerOperation, testingSchedulingTopology).getCrossRegionConsumedPartitionGroups()).isEmpty();
    }

    @Test
    void testSchedulingTopologyWithBlockingCrossRegionConsumedPartitionGroups() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 3);
        JobVertex createJobVertex3 = createJobVertex("v3", 2);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = build.getSchedulingTopology();
        ArrayList arrayList = new ArrayList();
        Iterable allPipelinedRegions = schedulingTopology.getAllPipelinedRegions();
        arrayList.getClass();
        allPipelinedRegions.forEach((v1) -> {
            r1.add(v1);
        });
        Assertions.assertThat(arrayList).hasSize(2);
        ExecutionVertex executionVertex = build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[0];
        HashSet hashSet = new HashSet();
        schedulingTopology.getPipelinedRegionOfVertex(executionVertex.getID()).getVertices().forEach(schedulingExecutionVertex -> {
            hashSet.add(schedulingExecutionVertex.getId());
        });
        Assertions.assertThat(hashSet).hasSize(5);
        ExecutionVertex executionVertex2 = build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[1];
        HashSet hashSet2 = new HashSet();
        schedulingTopology.getPipelinedRegionOfVertex(executionVertex2.getID()).getVertices().forEach(schedulingExecutionVertex2 -> {
            hashSet2.add(schedulingExecutionVertex2.getId());
        });
        Assertions.assertThat(hashSet2).hasSize(4);
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(1);
        List<ExecutionVertexID> list = this.testingSchedulerOperation.getScheduledVertices().get(0);
        Assertions.assertThat(list).hasSize(5);
        Iterator<ExecutionVertexID> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(hashSet).contains(new ExecutionVertexID[]{it.next()});
        }
        ExecutionVertex executionVertex3 = build.getJobVertex(createJobVertex2.getID()).getTaskVertices()[1];
        executionVertex3.finishPartitionsIfNeeded();
        startScheduling.onExecutionStateChange(executionVertex3.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        List<ExecutionVertexID> list2 = this.testingSchedulerOperation.getScheduledVertices().get(1);
        Assertions.assertThat(list2).hasSize(4);
        Iterator<ExecutionVertexID> it2 = list2.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(hashSet2).contains(new ExecutionVertexID[]{it2.next()});
        }
    }

    @Test
    void testSchedulingTopologyWithHybridCrossRegionConsumedPartitionGroups() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 3);
        JobVertex createJobVertex3 = createJobVertex("v3", 2);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = build.getSchedulingTopology();
        ArrayList arrayList = new ArrayList();
        Iterable allPipelinedRegions = schedulingTopology.getAllPipelinedRegions();
        arrayList.getClass();
        allPipelinedRegions.forEach((v1) -> {
            r1.add(v1);
        });
        Assertions.assertThat(arrayList).hasSize(2);
        ExecutionVertex executionVertex = build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[0];
        HashSet hashSet = new HashSet();
        schedulingTopology.getPipelinedRegionOfVertex(executionVertex.getID()).getVertices().forEach(schedulingExecutionVertex -> {
            hashSet.add(schedulingExecutionVertex.getId());
        });
        Assertions.assertThat(hashSet).hasSize(5);
        ExecutionVertex executionVertex2 = build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[1];
        HashSet hashSet2 = new HashSet();
        schedulingTopology.getPipelinedRegionOfVertex(executionVertex2.getID()).getVertices().forEach(schedulingExecutionVertex2 -> {
            hashSet2.add(schedulingExecutionVertex2.getId());
        });
        Assertions.assertThat(hashSet2).hasSize(4);
        startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        List<ExecutionVertexID> list = this.testingSchedulerOperation.getScheduledVertices().get(0);
        Assertions.assertThat(list).hasSize(5);
        Iterator<ExecutionVertexID> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(hashSet).contains(new ExecutionVertexID[]{it.next()});
        }
        List<ExecutionVertexID> list2 = this.testingSchedulerOperation.getScheduledVertices().get(1);
        Assertions.assertThat(list2).hasSize(4);
        Iterator<ExecutionVertexID> it2 = list2.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(hashSet2).contains(new ExecutionVertexID[]{it2.next()});
        }
    }

    @Test
    void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 2);
        JobVertex createJobVertex2 = createJobVertex("v2", 2);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(build.getSchedulingTopology());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        ExecutionVertex executionVertex = build.getJobVertex(createJobVertex.getID()).getTaskVertices()[0];
        executionVertex.finishPartitionsIfNeeded();
        startScheduling.onExecutionStateChange(executionVertex.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
    }

    @Test
    void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 1);
        JobVertex createJobVertex2 = createJobVertex("v2", 1);
        JobVertex createJobVertex3 = createJobVertex("v3", 1);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(build.getSchedulingTopology());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
        startScheduling.onExecutionStateChange(build.getJobVertex(createJobVertex.getID()).getTaskVertices()[0].getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
    }

    @Test
    void testScheduleTopologyWithHybridAndBlockingEdge() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 1);
        JobVertex createJobVertex2 = createJobVertex("v2", 1);
        JobVertex createJobVertex3 = createJobVertex("v3", 1);
        JobVertex createJobVertex4 = createJobVertex("v4", 1);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex3, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3, createJobVertex4))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(build.getSchedulingTopology());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        ExecutionVertex executionVertex = build.getJobVertex(createJobVertex.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = build.getJobVertex(createJobVertex2.getID()).getTaskVertices()[0];
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(0)).containsExactly(new ExecutionVertexID[]{executionVertex.getID()});
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(1)).containsExactly(new ExecutionVertexID[]{executionVertex2.getID()});
        executionVertex2.finishPartitionsIfNeeded();
        startScheduling.onExecutionStateChange(executionVertex2.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
        ExecutionVertex executionVertex3 = build.getJobVertex(createJobVertex3.getID()).getTaskVertices()[0];
        ExecutionVertex executionVertex4 = build.getJobVertex(createJobVertex4.getID()).getTaskVertices()[0];
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(2)).containsExactly(new ExecutionVertexID[]{executionVertex3.getID()});
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(3)).containsExactly(new ExecutionVertexID[]{executionVertex4.getID()});
    }

    @Test
    void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 1);
        JobVertex createJobVertex2 = createJobVertex("v2", 1);
        JobVertex createJobVertex3 = createJobVertex("v3", 1);
        JobVertex createJobVertex4 = createJobVertex("v4", 1);
        JobVertex createJobVertex5 = createJobVertex("v5", 1);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex5.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        createJobVertex3.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        startScheduling(TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3, createJobVertex4, createJobVertex5))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getSchedulingTopology());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(1);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(0)).hasSize(5);
    }

    @Test
    void testDownstreamRegionWillBeBlockedByBlockingEdge() throws Exception {
        JobVertex createJobVertex = createJobVertex("v1", 1);
        JobVertex createJobVertex2 = createJobVertex("v2", 1);
        JobVertex createJobVertex3 = createJobVertex("v3", 1);
        JobVertex createJobVertex4 = createJobVertex("v4", 1);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        createJobVertex4.connectNewDataSetAsInput(createJobVertex3, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(new ArrayList(Arrays.asList(createJobVertex, createJobVertex2, createJobVertex3, createJobVertex4))).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        PipelinedRegionSchedulingStrategy startScheduling = startScheduling(build.getSchedulingTopology());
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
        ExecutionVertex executionVertex = build.getJobVertex(createJobVertex.getID()).getTaskVertices()[0];
        executionVertex.finishPartitionsIfNeeded();
        startScheduling.onExecutionStateChange(executionVertex.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
    }

    private static JobVertex createJobVertex(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private PipelinedRegionSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
        PipelinedRegionSchedulingStrategy pipelinedRegionSchedulingStrategy = new PipelinedRegionSchedulingStrategy(this.testingSchedulerOperation, schedulingTopology);
        pipelinedRegionSchedulingStrategy.startScheduling();
        return pipelinedRegionSchedulingStrategy;
    }
}
