package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.class */
public class ExecutionGraphConstructionTest {
    @Test
    public void testCreateSimpleGraphBipartite() {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        verifyTestGraph(executionGraph, jobID, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    @Test
    public void testAttachViaDataSets() {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
        IntermediateDataSet createAndAddResultDataSet = jobVertex2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet2 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet3 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex4.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectDataSetAsInput(createAndAddResultDataSet2, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectDataSetAsInput(createAndAddResultDataSet3, DistributionPattern.ALL_TO_ALL);
        try {
            executionGraph.attachJobGraph(new ArrayList(Arrays.asList(jobVertex4, jobVertex5)));
        } catch (JobException e2) {
            e2.printStackTrace();
            Assert.fail("Job failed with exception: " + e2.getMessage());
        }
        verifyTestGraph(executionGraph, jobID, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    @Test
    public void testAttachViaIds() {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
        IntermediateDataSet createAndAddResultDataSet = jobVertex2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet2 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet createAndAddResultDataSet3 = jobVertex3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex4.connectIdInput(createAndAddResultDataSet.getId(), DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectIdInput(createAndAddResultDataSet2.getId(), DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectIdInput(createAndAddResultDataSet3.getId(), DistributionPattern.ALL_TO_ALL);
        try {
            executionGraph.attachJobGraph(new ArrayList(Arrays.asList(jobVertex4, jobVertex5)));
        } catch (JobException e2) {
            e2.printStackTrace();
            Assert.fail("Job failed with exception: " + e2.getMessage());
        }
        verifyTestGraph(executionGraph, jobID, jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5);
    }

    private void verifyTestGraph(ExecutionGraph executionGraph, JobID jobID, JobVertex jobVertex, JobVertex jobVertex2, JobVertex jobVertex3, JobVertex jobVertex4, JobVertex jobVertex5) {
        Map allVertices = executionGraph.getAllVertices();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) allVertices.get(jobVertex.getID());
        Assert.assertNotNull(executionJobVertex);
        Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getParallelism());
        Assert.assertEquals(jobVertex.getID(), executionJobVertex.getJobVertexId());
        Assert.assertEquals(jobID, executionJobVertex.getJobId());
        Assert.assertEquals(jobVertex, executionJobVertex.getJobVertex());
        Assert.assertEquals(1L, executionJobVertex.getProducedDataSets().length);
        Assert.assertEquals(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getId(), executionJobVertex.getProducedDataSets()[0].getId());
        Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getTaskVertices().length);
        int i = 0;
        for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
            Assert.assertEquals(jobID, executionVertex.getJobId());
            Assert.assertEquals(jobVertex.getID(), executionVertex.getJobvertexId());
            Assert.assertEquals(jobVertex.getParallelism(), executionVertex.getTotalNumberOfParallelSubtasks());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, executionVertex.getParallelSubtaskIndex());
            Assert.assertEquals(0L, executionVertex.getNumberOfInputs());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
        }
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) allVertices.get(jobVertex2.getID());
        Assert.assertNotNull(executionJobVertex2);
        Assert.assertEquals(1L, executionJobVertex2.getProducedDataSets().length);
        Assert.assertEquals(((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getId(), executionJobVertex2.getProducedDataSets()[0].getId());
        Assert.assertEquals(jobVertex2.getParallelism(), executionJobVertex2.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals(jobVertex2.getParallelism(), executionJobVertex2.getTaskVertices().length);
        int i3 = 0;
        for (ExecutionVertex executionVertex2 : executionJobVertex2.getTaskVertices()) {
            Assert.assertEquals(jobID, executionVertex2.getJobId());
            Assert.assertEquals(jobVertex2.getID(), executionVertex2.getJobvertexId());
            Assert.assertEquals(jobVertex2.getParallelism(), executionVertex2.getTotalNumberOfParallelSubtasks());
            int i4 = i3;
            i3++;
            Assert.assertEquals(i4, executionVertex2.getParallelSubtaskIndex());
            Assert.assertEquals(1L, executionVertex2.getNumberOfInputs());
            ExecutionEdge[] inputEdges = executionVertex2.getInputEdges(0);
            Assert.assertEquals(jobVertex.getParallelism(), inputEdges.length);
            int i5 = 0;
            for (ExecutionEdge executionEdge : inputEdges) {
                Assert.assertEquals(0L, executionEdge.getInputNum());
                i5 += executionEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals(10L, i5);
        }
        ExecutionJobVertex executionJobVertex3 = (ExecutionJobVertex) allVertices.get(jobVertex3.getID());
        Assert.assertNotNull(executionJobVertex3);
        Assert.assertEquals(2L, executionJobVertex3.getProducedDataSets().length);
        Assert.assertEquals(((IntermediateDataSet) jobVertex3.getProducedDataSets().get(0)).getId(), executionJobVertex3.getProducedDataSets()[0].getId());
        Assert.assertEquals(((IntermediateDataSet) jobVertex3.getProducedDataSets().get(1)).getId(), executionJobVertex3.getProducedDataSets()[1].getId());
        Assert.assertEquals(jobVertex3.getParallelism(), executionJobVertex3.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals(jobVertex3.getParallelism(), executionJobVertex3.getProducedDataSets()[1].getPartitions().length);
        Assert.assertEquals(jobVertex3.getParallelism(), executionJobVertex3.getTaskVertices().length);
        int i6 = 0;
        for (ExecutionVertex executionVertex3 : executionJobVertex3.getTaskVertices()) {
            Assert.assertEquals(jobID, executionVertex3.getJobId());
            Assert.assertEquals(jobVertex3.getID(), executionVertex3.getJobvertexId());
            Assert.assertEquals(jobVertex3.getParallelism(), executionVertex3.getTotalNumberOfParallelSubtasks());
            int i7 = i6;
            i6++;
            Assert.assertEquals(i7, executionVertex3.getParallelSubtaskIndex());
            Assert.assertEquals(0L, executionVertex3.getNumberOfInputs());
        }
        ExecutionJobVertex executionJobVertex4 = (ExecutionJobVertex) allVertices.get(jobVertex4.getID());
        Assert.assertNotNull(executionJobVertex4);
        Assert.assertEquals(1L, executionJobVertex4.getProducedDataSets().length);
        Assert.assertEquals(((IntermediateDataSet) jobVertex4.getProducedDataSets().get(0)).getId(), executionJobVertex4.getProducedDataSets()[0].getId());
        Assert.assertEquals(jobVertex4.getParallelism(), executionJobVertex4.getTaskVertices().length);
        int i8 = 0;
        for (ExecutionVertex executionVertex4 : executionJobVertex4.getTaskVertices()) {
            Assert.assertEquals(jobID, executionVertex4.getJobId());
            Assert.assertEquals(jobVertex4.getID(), executionVertex4.getJobvertexId());
            Assert.assertEquals(jobVertex4.getParallelism(), executionVertex4.getTotalNumberOfParallelSubtasks());
            int i9 = i8;
            i8++;
            Assert.assertEquals(i9, executionVertex4.getParallelSubtaskIndex());
            Assert.assertEquals(2L, executionVertex4.getNumberOfInputs());
            ExecutionEdge[] inputEdges2 = executionVertex4.getInputEdges(0);
            Assert.assertEquals(jobVertex2.getParallelism(), inputEdges2.length);
            int i10 = 0;
            for (ExecutionEdge executionEdge2 : inputEdges2) {
                Assert.assertEquals(0L, executionEdge2.getInputNum());
                i10 += executionEdge2.getSource().getPartitionNumber();
            }
            Assert.assertEquals(21L, i10);
            ExecutionEdge[] inputEdges3 = executionVertex4.getInputEdges(1);
            Assert.assertEquals(jobVertex3.getParallelism(), inputEdges3.length);
            int i11 = 0;
            for (ExecutionEdge executionEdge3 : inputEdges3) {
                Assert.assertEquals(1L, executionEdge3.getInputNum());
                i11 += executionEdge3.getSource().getPartitionNumber();
            }
            Assert.assertEquals(1L, i11);
        }
        ExecutionJobVertex executionJobVertex5 = (ExecutionJobVertex) allVertices.get(jobVertex5.getID());
        Assert.assertNotNull(executionJobVertex5);
        Assert.assertEquals(0L, executionJobVertex5.getProducedDataSets().length);
        Assert.assertEquals(jobVertex5.getParallelism(), executionJobVertex5.getTaskVertices().length);
        int i12 = 0;
        for (ExecutionVertex executionVertex5 : executionJobVertex5.getTaskVertices()) {
            Assert.assertEquals(jobID, executionVertex5.getJobId());
            Assert.assertEquals(jobVertex5.getID(), executionVertex5.getJobvertexId());
            Assert.assertEquals(jobVertex5.getParallelism(), executionVertex5.getTotalNumberOfParallelSubtasks());
            int i13 = i12;
            i12++;
            Assert.assertEquals(i13, executionVertex5.getParallelSubtaskIndex());
            Assert.assertEquals(2L, executionVertex5.getNumberOfInputs());
            ExecutionEdge[] inputEdges4 = executionVertex5.getInputEdges(0);
            Assert.assertEquals(jobVertex4.getParallelism(), inputEdges4.length);
            int i14 = 0;
            for (ExecutionEdge executionEdge4 : inputEdges4) {
                Assert.assertEquals(0L, executionEdge4.getInputNum());
                i14 += executionEdge4.getSource().getPartitionNumber();
            }
            Assert.assertEquals(55L, i14);
            ExecutionEdge[] inputEdges5 = executionVertex5.getInputEdges(1);
            Assert.assertEquals(jobVertex3.getParallelism(), inputEdges5.length);
            int i15 = 0;
            for (ExecutionEdge executionEdge5 : inputEdges5) {
                Assert.assertEquals(1L, executionEdge5.getInputNum());
                i15 += executionEdge5.getSource().getPartitionNumber();
            }
            Assert.assertEquals(1L, i15);
        }
    }

    @Test
    public void testCannotConnectMissingId() {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setParallelism(7);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
        try {
            executionGraph.attachJobGraph(new ArrayList(Arrays.asList(jobVertex2)));
            Assert.fail("Attached wrong jobgraph");
        } catch (JobException e2) {
        }
    }

    @Test
    public void testCannotConnectWrongOrder() {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
        try {
            new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout()).attachJobGraph(new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex5, jobVertex4)));
            Assert.fail("Attached wrong jobgraph");
        } catch (JobException e) {
        }
    }

    @Test
    public void testSetupInputSplits() {
        try {
            InputSplit[] inputSplitArr = new InputSplit[0];
            InputSplitAssigner inputSplitAssigner = (InputSplitAssigner) Mockito.mock(InputSplitAssigner.class);
            InputSplitAssigner inputSplitAssigner2 = (InputSplitAssigner) Mockito.mock(InputSplitAssigner.class);
            InputSplitSource inputSplitSource = (InputSplitSource) Mockito.mock(InputSplitSource.class);
            InputSplitSource inputSplitSource2 = (InputSplitSource) Mockito.mock(InputSplitSource.class);
            Mockito.when(inputSplitSource.createInputSplits(Matchers.anyInt())).thenReturn(inputSplitArr);
            Mockito.when(inputSplitSource2.createInputSplits(Matchers.anyInt())).thenReturn(inputSplitArr);
            Mockito.when(inputSplitSource.getInputSplitAssigner(inputSplitArr)).thenReturn(inputSplitAssigner);
            Mockito.when(inputSplitSource2.getInputSplitAssigner(inputSplitArr)).thenReturn(inputSplitAssigner2);
            JobID jobID = new JobID();
            Configuration configuration = new Configuration();
            JobVertex jobVertex = new JobVertex("vertex1");
            JobVertex jobVertex2 = new JobVertex("vertex2");
            JobVertex jobVertex3 = new JobVertex("vertex3");
            JobVertex jobVertex4 = new JobVertex("vertex4");
            JobVertex jobVertex5 = new JobVertex("vertex5");
            jobVertex.setParallelism(5);
            jobVertex2.setParallelism(7);
            jobVertex3.setParallelism(2);
            jobVertex4.setParallelism(11);
            jobVertex5.setParallelism(4);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
            jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL);
            jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
            jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL);
            jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL);
            jobVertex3.setInputSplitSource(inputSplitSource);
            jobVertex5.setInputSplitSource(inputSplitSource2);
            ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
            ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout());
            try {
                executionGraph.attachJobGraph(arrayList);
            } catch (JobException e) {
                e.printStackTrace();
                Assert.fail("Job failed with exception: " + e.getMessage());
            }
            Assert.assertEquals(inputSplitAssigner, ((ExecutionJobVertex) executionGraph.getAllVertices().get(jobVertex3.getID())).getSplitAssigner());
            Assert.assertEquals(inputSplitAssigner2, ((ExecutionJobVertex) executionGraph.getAllVertices().get(jobVertex5.getID())).getSplitAssigner());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testMoreThanOneConsumerForIntermediateResult() {
        try {
            JobID jobID = new JobID();
            Configuration configuration = new Configuration();
            JobVertex jobVertex = new JobVertex("vertex1");
            JobVertex jobVertex2 = new JobVertex("vertex2");
            JobVertex jobVertex3 = new JobVertex("vertex3");
            jobVertex.setParallelism(5);
            jobVertex2.setParallelism(7);
            jobVertex3.setParallelism(2);
            IntermediateDataSet createAndAddResultDataSet = jobVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
            jobVertex2.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
            jobVertex3.connectDataSetAsInput(createAndAddResultDataSet, DistributionPattern.ALL_TO_ALL);
            try {
                new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Test Job Sample Name", configuration, AkkaUtils.getDefaultTimeout()).attachJobGraph(new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3)));
                Assert.fail("Should not be possible");
            } catch (RuntimeException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCoLocationConstraintCreation() {
        try {
            JobID jobID = new JobID();
            Configuration configuration = new Configuration();
            JobVertex jobVertex = new JobVertex("vertex1");
            JobVertex jobVertex2 = new JobVertex("vertex2");
            jobVertex.setParallelism(6);
            jobVertex2.setParallelism(4);
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            jobVertex.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.setStrictlyCoLocatedWith(jobVertex);
            jobVertex.setStrictlyCoLocatedWith(jobVertex2);
            JobVertex jobVertex3 = new JobVertex("vertex3");
            JobVertex jobVertex4 = new JobVertex("vertex4");
            JobVertex jobVertex5 = new JobVertex("vertex5");
            JobVertex jobVertex6 = new JobVertex("vertex6");
            JobVertex jobVertex7 = new JobVertex("vertex7");
            jobVertex3.setParallelism(3);
            jobVertex4.setParallelism(3);
            jobVertex5.setParallelism(3);
            jobVertex6.setParallelism(3);
            jobVertex7.setParallelism(3);
            SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
            jobVertex3.setSlotSharingGroup(slotSharingGroup2);
            jobVertex4.setSlotSharingGroup(slotSharingGroup2);
            jobVertex5.setSlotSharingGroup(slotSharingGroup2);
            jobVertex6.setSlotSharingGroup(slotSharingGroup2);
            jobVertex7.setSlotSharingGroup(slotSharingGroup2);
            jobVertex4.setStrictlyCoLocatedWith(jobVertex3);
            jobVertex5.setStrictlyCoLocatedWith(jobVertex4);
            jobVertex6.setStrictlyCoLocatedWith(jobVertex3);
            jobVertex3.setStrictlyCoLocatedWith(jobVertex7);
            JobVertex jobVertex8 = new JobVertex("vertex8");
            jobVertex8.setParallelism(2);
            JobGraph jobGraph = new JobGraph(jobID, "Co-Location Constraint Sample Job", new JobVertex[]{jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5, jobVertex6, jobVertex7, jobVertex8});
            ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), jobID, "Co-Location Constraint Sample Job", configuration, AkkaUtils.getDefaultTimeout());
            executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
            ExecutionVertex[] taskVertices = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices();
            ExecutionVertex[] taskVertices2 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices();
            HashSet hashSet = new HashSet();
            for (int i = 0; i < jobVertex2.getParallelism(); i++) {
                Assert.assertNotNull(taskVertices[i].getLocationConstraint());
                Assert.assertNotNull(taskVertices2[i].getLocationConstraint());
                Assert.assertTrue(taskVertices[i].getLocationConstraint() == taskVertices2[i].getLocationConstraint());
                hashSet.add(taskVertices[i].getLocationConstraint());
            }
            for (int parallelism = jobVertex2.getParallelism(); parallelism < jobVertex.getParallelism(); parallelism++) {
                Assert.assertNotNull(taskVertices[parallelism].getLocationConstraint());
                hashSet.add(taskVertices[parallelism].getLocationConstraint());
            }
            Assert.assertEquals("not all co location constraints are distinct", jobVertex.getParallelism(), hashSet.size());
            ExecutionVertex[] taskVertices3 = executionGraph.getJobVertex(jobVertex3.getID()).getTaskVertices();
            ExecutionVertex[] taskVertices4 = executionGraph.getJobVertex(jobVertex4.getID()).getTaskVertices();
            ExecutionVertex[] taskVertices5 = executionGraph.getJobVertex(jobVertex5.getID()).getTaskVertices();
            ExecutionVertex[] taskVertices6 = executionGraph.getJobVertex(jobVertex6.getID()).getTaskVertices();
            ExecutionVertex[] taskVertices7 = executionGraph.getJobVertex(jobVertex7.getID()).getTaskVertices();
            HashSet hashSet2 = new HashSet();
            for (int i2 = 0; i2 < jobVertex3.getParallelism(); i2++) {
                Assert.assertNotNull(taskVertices3[i2].getLocationConstraint());
                Assert.assertTrue(taskVertices3[i2].getLocationConstraint() == taskVertices4[i2].getLocationConstraint());
                Assert.assertTrue(taskVertices4[i2].getLocationConstraint() == taskVertices5[i2].getLocationConstraint());
                Assert.assertTrue(taskVertices5[i2].getLocationConstraint() == taskVertices6[i2].getLocationConstraint());
                Assert.assertTrue(taskVertices6[i2].getLocationConstraint() == taskVertices7[i2].getLocationConstraint());
                hashSet2.add(taskVertices3[i2].getLocationConstraint());
            }
            Assert.assertEquals("not all co location constraints are distinct", jobVertex3.getParallelism(), hashSet2.size());
            ExecutionVertex[] taskVertices8 = executionGraph.getJobVertex(jobVertex8.getID()).getTaskVertices();
            for (int i3 = 0; i3 < jobVertex8.getParallelism(); i3++) {
                Assert.assertNull(taskVertices8[i3].getLocationConstraint());
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
