package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/forwardgroup/ForwardGroupComputeUtilTest.class */
class ForwardGroupComputeUtilTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ForwardGroupComputeUtilTest() {
    }

    @Test
    void testIsolatedVertices() throws Exception {
        checkGroupSize(computeForwardGroups(new JobVertex("v1"), new JobVertex("v2"), new JobVertex("v3")), 0, new Integer[0]);
    }

    @Test
    void testVariousResultPartitionTypesBetweenVertices() throws Exception {
        testThreeVerticesConnectSequentially(false, true, 1, 2);
        testThreeVerticesConnectSequentially(false, false, 0, new Integer[0]);
        testThreeVerticesConnectSequentially(true, true, 1, 3);
    }

    private void testThreeVerticesConnectSequentially(boolean z, boolean z2, int i, Integer... numArr) throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        JobVertex jobVertex2 = new JobVertex("v2");
        JobVertex jobVertex3 = new JobVertex("v3");
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        if (z) {
            ((JobEdge) ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        }
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        if (z2) {
            ((JobEdge) ((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        }
        checkGroupSize(computeForwardGroups(jobVertex, jobVertex2, jobVertex3), i, numArr);
    }

    @Test
    void testTwoInputsMergesIntoOne() throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        JobVertex jobVertex2 = new JobVertex("v2");
        JobVertex jobVertex3 = new JobVertex("v3");
        JobVertex jobVertex4 = new JobVertex("v4");
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ((JobEdge) ((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((JobEdge) ((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        checkGroupSize(computeForwardGroups(jobVertex, jobVertex2, jobVertex3, jobVertex4), 1, 3);
    }

    @Test
    void testOneInputSplitsIntoTwo() throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        JobVertex jobVertex2 = new JobVertex("v2");
        JobVertex jobVertex3 = new JobVertex("v3");
        JobVertex jobVertex4 = new JobVertex("v4");
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ((JobEdge) ((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getConsumers().get(0)).setForward(true);
        ((JobEdge) ((IntermediateDataSet) jobVertex2.getProducedDataSets().get(1)).getConsumers().get(0)).setForward(true);
        checkGroupSize(computeForwardGroups(jobVertex, jobVertex2, jobVertex3, jobVertex4), 1, 3);
    }

    private static Set<ForwardGroup> computeForwardGroups(JobVertex... jobVertexArr) throws Exception {
        Arrays.asList(jobVertexArr).forEach(jobVertex -> {
            jobVertex.setInvokableClass(NoOpInvokable.class);
        });
        DefaultExecutionGraph createDynamicGraph = createDynamicGraph(jobVertexArr);
        List asList = Arrays.asList(jobVertexArr);
        createDynamicGraph.getClass();
        return new HashSet(ForwardGroupComputeUtil.computeForwardGroups(asList, createDynamicGraph::getJobVertex).values());
    }

    private static void checkGroupSize(Set<ForwardGroup> set, int i, Integer... numArr) {
        Assertions.assertThat(set.size()).isEqualTo(i);
        Assertions.assertThat((List) set.stream().map((v0) -> {
            return v0.size();
        }).collect(Collectors.toList())).contains(numArr);
    }

    private static DefaultExecutionGraph createDynamicGraph(JobVertex... jobVertexArr) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertexArr)).setVertexParallelismStore(AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(Arrays.asList(jobVertexArr), 10)).buildDynamicGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }
}
