package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.class */
public class RestartPipelinedRegionStrategyTest {
    @Test
    public void testSimpleFailoverRegion() throws Exception {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(5);
        jobVertex2.setParallelism(7);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(11);
        jobVertex5.setParallelism(4);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), jobID, "Test Job Sample Name", configuration, new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new RestartPipelinedRegionStrategy.Factory(), Collections.emptyList(), Collections.emptyList(), new Scheduler(TestingUtils.defaultExecutor()), ExecutionGraph.class.getClassLoader());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionJobVertex jobVertex6 = executionGraph.getJobVertex(jobVertex.getID());
        ExecutionJobVertex jobVertex7 = executionGraph.getJobVertex(jobVertex2.getID());
        ExecutionJobVertex jobVertex8 = executionGraph.getJobVertex(jobVertex3.getID());
        ExecutionJobVertex jobVertex9 = executionGraph.getJobVertex(jobVertex4.getID());
        ExecutionJobVertex jobVertex10 = executionGraph.getJobVertex(jobVertex5.getID());
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(jobVertex6.getTaskVertices()[2]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(jobVertex7.getTaskVertices()[3]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(jobVertex8.getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(jobVertex9.getTaskVertices()[4]);
        FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(jobVertex10.getTaskVertices()[1]);
        Assert.assertEquals(failoverRegion, failoverRegion2);
        Assert.assertEquals(failoverRegion3, failoverRegion2);
        Assert.assertEquals(failoverRegion4, failoverRegion2);
        Assert.assertEquals(failoverRegion5, failoverRegion2);
    }

    @Test
    public void testMultipleFailoverRegions() throws Exception {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(3);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(5);
        jobVertex5.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), jobID, "Test Job Sample Name", configuration, new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new RestartPipelinedRegionStrategy.Factory(), Collections.emptyList(), Collections.emptyList(), new Scheduler(TestingUtils.defaultExecutor()), ExecutionGraph.class.getClassLoader());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionJobVertex jobVertex6 = executionGraph.getJobVertex(jobVertex.getID());
        ExecutionJobVertex jobVertex7 = executionGraph.getJobVertex(jobVertex2.getID());
        ExecutionJobVertex jobVertex8 = executionGraph.getJobVertex(jobVertex3.getID());
        ExecutionJobVertex jobVertex9 = executionGraph.getJobVertex(jobVertex4.getID());
        ExecutionJobVertex jobVertex10 = executionGraph.getJobVertex(jobVertex5.getID());
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(jobVertex6.getTaskVertices()[1]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(jobVertex7.getTaskVertices()[0]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(jobVertex9.getTaskVertices()[3]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(jobVertex8.getTaskVertices()[0]);
        FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(jobVertex8.getTaskVertices()[1]);
        FailoverRegion failoverRegion6 = failoverStrategy.getFailoverRegion(jobVertex10.getTaskVertices()[0]);
        FailoverRegion failoverRegion7 = failoverStrategy.getFailoverRegion(jobVertex10.getTaskVertices()[1]);
        Assert.assertEquals(failoverRegion, failoverRegion2);
        Assert.assertEquals(failoverRegion2, failoverRegion3);
        Assert.assertFalse(failoverRegion4.equals(failoverRegion5));
        Assert.assertFalse(failoverRegion6.equals(failoverRegion7));
    }

    @Test
    public void testSingleRegionWithMixedInput() throws Exception {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        JobVertex jobVertex5 = new JobVertex("vertex5");
        jobVertex.setParallelism(3);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(2);
        jobVertex4.setParallelism(5);
        jobVertex5.setParallelism(2);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex5.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex5.connectNewDataSetAsInput(jobVertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4, jobVertex5));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), jobID, "Test Job Sample Name", configuration, new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new RestartPipelinedRegionStrategy.Factory(), Collections.emptyList(), Collections.emptyList(), new Scheduler(TestingUtils.defaultExecutor()), ExecutionGraph.class.getClassLoader());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionJobVertex jobVertex6 = executionGraph.getJobVertex(jobVertex.getID());
        ExecutionJobVertex jobVertex7 = executionGraph.getJobVertex(jobVertex2.getID());
        ExecutionJobVertex jobVertex8 = executionGraph.getJobVertex(jobVertex3.getID());
        ExecutionJobVertex jobVertex9 = executionGraph.getJobVertex(jobVertex4.getID());
        ExecutionJobVertex jobVertex10 = executionGraph.getJobVertex(jobVertex5.getID());
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(jobVertex6.getTaskVertices()[1]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(jobVertex7.getTaskVertices()[0]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(jobVertex9.getTaskVertices()[3]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(jobVertex8.getTaskVertices()[0]);
        FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(jobVertex10.getTaskVertices()[1]);
        Assert.assertEquals(failoverRegion, failoverRegion2);
        Assert.assertEquals(failoverRegion2, failoverRegion3);
        Assert.assertEquals(failoverRegion4, failoverRegion2);
        Assert.assertEquals(failoverRegion, failoverRegion5);
    }

    @Test
    public void testMultiRegionNotAllToAll() throws Exception {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        JobVertex jobVertex = new JobVertex("vertex1");
        JobVertex jobVertex2 = new JobVertex("vertex2");
        JobVertex jobVertex3 = new JobVertex("vertex3");
        JobVertex jobVertex4 = new JobVertex("vertex4");
        new JobVertex("vertex5");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex3.setParallelism(5);
        jobVertex4.setParallelism(5);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex2.setInvokableClass(AbstractInvokable.class);
        jobVertex3.setInvokableClass(AbstractInvokable.class);
        jobVertex4.setInvokableClass(AbstractInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList arrayList = new ArrayList(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4));
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), jobID, "Test Job Sample Name", configuration, new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new RestartPipelinedRegionStrategy.Factory(), Collections.emptyList(), Collections.emptyList(), new Scheduler(TestingUtils.defaultExecutor()), ExecutionGraph.class.getClassLoader());
        try {
            executionGraph.attachJobGraph(arrayList);
        } catch (JobException e) {
            e.printStackTrace();
            Assert.fail("Job failed with exception: " + e.getMessage());
        }
        RestartPipelinedRegionStrategy failoverStrategy = executionGraph.getFailoverStrategy();
        ExecutionJobVertex jobVertex5 = executionGraph.getJobVertex(jobVertex.getID());
        ExecutionJobVertex jobVertex6 = executionGraph.getJobVertex(jobVertex2.getID());
        ExecutionJobVertex jobVertex7 = executionGraph.getJobVertex(jobVertex3.getID());
        ExecutionJobVertex jobVertex8 = executionGraph.getJobVertex(jobVertex4.getID());
        FailoverRegion failoverRegion = failoverStrategy.getFailoverRegion(jobVertex5.getTaskVertices()[0]);
        FailoverRegion failoverRegion2 = failoverStrategy.getFailoverRegion(jobVertex5.getTaskVertices()[1]);
        FailoverRegion failoverRegion3 = failoverStrategy.getFailoverRegion(jobVertex6.getTaskVertices()[0]);
        FailoverRegion failoverRegion4 = failoverStrategy.getFailoverRegion(jobVertex6.getTaskVertices()[1]);
        FailoverRegion failoverRegion5 = failoverStrategy.getFailoverRegion(jobVertex7.getTaskVertices()[0]);
        FailoverRegion failoverRegion6 = failoverStrategy.getFailoverRegion(jobVertex8.getTaskVertices()[3]);
        Assert.assertEquals(failoverRegion, failoverRegion3);
        Assert.assertEquals(failoverRegion2, failoverRegion4);
        Assert.assertFalse(failoverRegion.equals(failoverRegion2));
        Assert.assertFalse(failoverRegion5.equals(failoverRegion6));
    }
}
