package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.class */
public class DefaultExecutionGraphConstructionTest {
    private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> list) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(list)).build();
    }

    @Test
    public void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        ExecutionGraph createDefaultExecutionGraph2 = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList);
        createDefaultExecutionGraph2.attachJobGraph(arrayList);
        MatcherAssert.assertThat(Sets.intersection(createDefaultExecutionGraph.getRegisteredExecutions().keySet(), createDefaultExecutionGraph2.getRegisteredExecutions().keySet()), Matchers.is(Matchers.empty()));
    }

    @Test
    public void testCreateSimpleGraphBipartite() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        try {
            createDefaultExecutionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        verifyTestGraph(createDefaultExecutionGraph, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    @Test
    public void testAttachViaDataSets() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet = jobVertex2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet2 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet3 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex4.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectDataSetAsInput(createAndAddResultDataSet2, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectDataSetAsInput(createAndAddResultDataSet3, DistributionPattern.ALL_TO_ALL);
        List asList = Arrays.asList(jobVertex, jobVertex2, jobVertex3);
        List asList2 = Arrays.asList(jobVertex4, jobVertex5);
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph((List) Stream.concat(asList.stream(), asList2.stream()).collect(Collectors.toList()));
        try {
            createDefaultExecutionGraph.attachJobGraph(asList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        try {
            createDefaultExecutionGraph.attachJobGraph(asList2);
        } catch (JobException e2) {
            e2.printStackTrace();
            Assert.fail("Job failed with exception: " + e2.getMessage());
        }
        verifyTestGraph(createDefaultExecutionGraph, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    private void verifyTestGraph(ExecutionGraph executionGraph, JobVertex jobVertex, JobVertex jobVertex2, JobVertex jobVertex3, JobVertex jobVertex4, JobVertex jobVertex5) {
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex, null, Collections.singletonList(jobVertex2));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex2, Collections.singletonList(jobVertex), Collections.singletonList(jobVertex4));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex3, null, Arrays.asList(jobVertex4, jobVertex5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex4, Arrays.asList(jobVertex2, jobVertex3), Collections.singletonList(jobVertex5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(executionGraph, jobVertex5, Arrays.asList(jobVertex4, jobVertex3), null);
    }

    @Test
    public void testCannotConnectWrongOrder() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex5, jobVertex4));
        try {
            createDefaultExecutionGraph(arrayList).attachJobGraph(arrayList);
            Assert.fail("Attached wrong jobgraph");
        } catch (JobException e) {
        }
    }

    @Test
    public void testSetupInputSplits() {
        try {
            InputSplit[] inputSplitArr = new InputSplit[0];
            InputSplitAssigner inputSplitAssigner = (InputSplitAssigner) Mockito.mock(InputSplitAssigner.class);
            InputSplitAssigner inputSplitAssigner2 = (InputSplitAssigner) Mockito.mock(InputSplitAssigner.class);
            InputSplitSource inputSplitSource = (InputSplitSource) Mockito.mock(InputSplitSource.class);
            InputSplitSource inputSplitSource2 = (InputSplitSource) Mockito.mock(InputSplitSource.class);
            Mockito.when(inputSplitSource.createInputSplits(org.mockito.Matchers.anyInt())).thenReturn(inputSplitArr);
            Mockito.when(inputSplitSource2.createInputSplits(org.mockito.Matchers.anyInt())).thenReturn(inputSplitArr);
            Mockito.when(inputSplitSource.getInputSplitAssigner(inputSplitArr)).thenReturn(inputSplitAssigner);
            Mockito.when(inputSplitSource2.getInputSplitAssigner(inputSplitArr)).thenReturn(inputSplitAssigner2);
            new JobID();
            new Configuration();
            JobVertex jobVertex = new JobVertex("vertex1");
            JobVertex jobVertex2 = new JobVertex("vertex2");
            JobVertex jobVertex3 = new JobVertex("vertex3");
            JobVertex jobVertex4 = new JobVertex("vertex4");
            JobVertex jobVertex5 = new JobVertex("vertex5");
            jobVertex.setParallelism(5);
            jobVertex2.setParallelism(7);
            jobVertex3.setParallelism(2);
            jobVertex4.setParallelism(11);
            jobVertex5.setParallelism(4);
            jobVertex.setInvokableClass(AbstractInvokable.class);
            jobVertex2.setInvokableClass(AbstractInvokable.class);
            jobVertex3.setInvokableClass(AbstractInvokable.class);
            jobVertex4.setInvokableClass(AbstractInvokable.class);
            jobVertex5.setInvokableClass(AbstractInvokable.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex3.setInputSplitSource(inputSplitSource);
            jobVertex5.setInputSplitSource(inputSplitSource2);
            ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
            ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
            try {
                createDefaultExecutionGraph.attachJobGraph(arrayList);
            } catch (JobException e) {
                e.printStackTrace();
                Assert.fail("Job failed with exception: " + e.getMessage());
            }
            Assert.assertEquals(inputSplitAssigner, ((ExecutionJobVertex) createDefaultExecutionGraph.getAllVertices().get(jobVertex3.getID())).getSplitAssigner());
            Assert.assertEquals(inputSplitAssigner2, ((ExecutionJobVertex) createDefaultExecutionGraph.getAllVertices().get(jobVertex5.getID())).getSplitAssigner());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testMoreThanOneConsumerForIntermediateResult() {
        try {
            JobVertex jobVertex = new JobVertex("vertex1");
            JobVertex jobVertex2 = new JobVertex("vertex2");
            JobVertex jobVertex3 = new JobVertex("vertex3");
            jobVertex.setParallelism(5);
            jobVertex2.setParallelism(7);
            jobVertex3.setParallelism(2);
            IntermediateDataSet createAndAddResultDataSet = jobVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
            jobVertex2.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
            jobVertex3.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
            ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
            try {
                createDefaultExecutionGraph(arrayList).attachJobGraph(arrayList);
                Assert.fail("Should not be possible");
            } catch (RuntimeException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2));
        ExecutionGraph createDefaultExecutionGraph = createDefaultExecutionGraph(arrayList);
        createDefaultExecutionGraph.attachJobGraph(arrayList);
        IntermediateResult intermediateResult = ((ExecutionJobVertex) Objects.requireNonNull(createDefaultExecutionGraph.getJobVertex(jobVertex.getID()))).getProducedDataSets()[0];
        IntermediateResultPartition intermediateResultPartition = intermediateResult.getPartitions()[0];
        IntermediateResultPartition intermediateResultPartition2 = intermediateResult.getPartitions()[1];
        Assert.assertEquals(intermediateResultPartition.getConsumedPartitionGroups().get(0), intermediateResultPartition2.getConsumedPartitionGroups().get(0));
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup) intermediateResultPartition.getConsumedPartitionGroups().get(0);
        HashSet hashSet = new HashSet();
        Iterator it = consumedPartitionGroup.iterator();
        while (it.hasNext()) {
            hashSet.add((IntermediateResultPartitionID) it.next());
        }
        MatcherAssert.assertThat(hashSet, Matchers.containsInAnyOrder(new IntermediateResultPartitionID[]{intermediateResultPartition.getPartitionId(), intermediateResultPartition2.getPartitionId()}));
    }
}
