/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;

public class PipelinedFailoverRegionBuildingTest
extends TestLogger {
    @Test
    public void testIndividualVertices() throws Exception {
        JobVertex source1 = new JobVertex("source1");
        source1.setInvokableClass(NoOpInvokable.class);
        source1.setParallelism(2);
        JobVertex source2 = new JobVertex("source2");
        source2.setInvokableClass(NoOpInvokable.class);
        source2.setParallelism(2);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{source1, source2});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion sourceRegion11 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[0]);
        FailoverRegion sourceRegion12 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[1]);
        FailoverRegion targetRegion21 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[0]);
        FailoverRegion targetRegion22 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[1]);
        Assert.assertTrue((sourceRegion11 != sourceRegion12 ? 1 : 0) != 0);
        Assert.assertTrue((sourceRegion12 != targetRegion21 ? 1 : 0) != 0);
        Assert.assertTrue((targetRegion21 != targetRegion22 ? 1 : 0) != 0);
    }

    @Test
    public void testEmbarrassinglyParallelCase() throws Exception {
        int parallelism = 10000;
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(parallelism);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(parallelism);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(parallelism);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion preRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]);
        FailoverRegion preRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
        FailoverRegion preRegion3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
        Assert.assertTrue((preRegion1 == preRegion2 ? 1 : 0) != 0);
        Assert.assertTrue((preRegion2 == preRegion3 ? 1 : 0) != 0);
        for (int i = 1; i < parallelism; ++i) {
            FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[i]);
            FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[i]);
            FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[i]);
            Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
            Assert.assertTrue((region2 == region3 ? 1 : 0) != 0);
            Assert.assertTrue((preRegion1 != region1 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testOneComponentViaTwoExchanges() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(3);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(5);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(2);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
        FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[4]);
        FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
        Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
        Assert.assertTrue((region2 == region3 ? 1 : 0) != 0);
    }

    @Test
    public void testOneComponentViaCascadeOfJoins() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(8);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(8);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(8);
        JobVertex vertex4 = new JobVertex("vertex4");
        vertex4.setInvokableClass(NoOpInvokable.class);
        vertex4.setParallelism(8);
        JobVertex vertex5 = new JobVertex("vertex5");
        vertex5.setInvokableClass(NoOpInvokable.class);
        vertex5.setParallelism(4);
        JobVertex vertex6 = new JobVertex("vertex6");
        vertex6.setInvokableClass(NoOpInvokable.class);
        vertex6.setParallelism(4);
        JobVertex vertex7 = new JobVertex("vertex7");
        vertex7.setInvokableClass(NoOpInvokable.class);
        vertex7.setParallelism(2);
        vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        Iterator evs = eg.getAllExecutionVertices().iterator();
        FailoverRegion preRegion = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
        while (evs.hasNext()) {
            FailoverRegion region = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
            Assert.assertTrue((preRegion == region ? 1 : 0) != 0);
        }
    }

    @Test
    public void testOneComponentInstanceFromOneSource() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(8);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(8);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(8);
        JobVertex vertex4 = new JobVertex("vertex4");
        vertex4.setInvokableClass(NoOpInvokable.class);
        vertex4.setParallelism(8);
        JobVertex vertex5 = new JobVertex("vertex5");
        vertex5.setInvokableClass(NoOpInvokable.class);
        vertex5.setParallelism(4);
        JobVertex vertex6 = new JobVertex("vertex6");
        vertex6.setInvokableClass(NoOpInvokable.class);
        vertex6.setParallelism(4);
        JobVertex vertex7 = new JobVertex("vertex7");
        vertex7.setInvokableClass(NoOpInvokable.class);
        vertex7.setParallelism(2);
        vertex1.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex2.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex3.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex4.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex5.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex6.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex7, vertex5, vertex6, vertex1, vertex2, vertex3, vertex4});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        Iterator evs = eg.getAllExecutionVertices().iterator();
        FailoverRegion preRegion = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
        while (evs.hasNext()) {
            FailoverRegion region = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
            Assert.assertTrue((preRegion == region ? 1 : 0) != 0);
        }
    }

    @Test
    public void testTwoComponentsViaBlockingExchange() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(3);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(2);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(2);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
        FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
        FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
        FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]);
        Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
        Assert.assertTrue((region2 != region31 ? 1 : 0) != 0);
        Assert.assertTrue((region32 != region31 ? 1 : 0) != 0);
    }

    @Test
    public void testTwoComponentsViaBlockingExchange2() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(3);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(2);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(2);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]);
        FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]);
        FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
        FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]);
        Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
        Assert.assertTrue((region2 != region31 ? 1 : 0) != 0);
        Assert.assertTrue((region32 != region31 ? 1 : 0) != 0);
    }

    @Test
    public void testMultipleComponentsViaCascadeOfJoins() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(8);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(8);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(8);
        JobVertex vertex4 = new JobVertex("vertex4");
        vertex4.setInvokableClass(NoOpInvokable.class);
        vertex4.setParallelism(8);
        JobVertex vertex5 = new JobVertex("vertex5");
        vertex5.setInvokableClass(NoOpInvokable.class);
        vertex5.setParallelism(4);
        JobVertex vertex6 = new JobVertex("vertex6");
        vertex6.setInvokableClass(NoOpInvokable.class);
        vertex6.setParallelism(4);
        JobVertex vertex7 = new JobVertex("vertex7");
        vertex7.setInvokableClass(NoOpInvokable.class);
        vertex7.setParallelism(2);
        vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]);
        FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[5]);
        FailoverRegion region5 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex5.getID()).getTaskVertices()[2]);
        Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
        Assert.assertTrue((region1 == region5 ? 1 : 0) != 0);
        FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]);
        FailoverRegion region4 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex4.getID()).getTaskVertices()[5]);
        FailoverRegion region6 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex6.getID()).getTaskVertices()[2]);
        Assert.assertTrue((region3 == region4 ? 1 : 0) != 0);
        Assert.assertTrue((region3 == region6 ? 1 : 0) != 0);
        FailoverRegion region71 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[0]);
        FailoverRegion region72 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[1]);
        Assert.assertTrue((region71 != region72 ? 1 : 0) != 0);
        Assert.assertTrue((region1 != region71 ? 1 : 0) != 0);
        Assert.assertTrue((region1 != region72 ? 1 : 0) != 0);
        Assert.assertTrue((region3 != region71 ? 1 : 0) != 0);
        Assert.assertTrue((region3 != region72 ? 1 : 0) != 0);
    }

    @Test
    public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception {
        JobVertex vertex1 = new JobVertex("vertex1");
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex1.setParallelism(8);
        JobVertex vertex2 = new JobVertex("vertex2");
        vertex2.setInvokableClass(NoOpInvokable.class);
        vertex2.setParallelism(8);
        JobVertex vertex3 = new JobVertex("vertex3");
        vertex3.setInvokableClass(NoOpInvokable.class);
        vertex3.setParallelism(8);
        JobVertex vertex4 = new JobVertex("vertex4");
        vertex4.setInvokableClass(NoOpInvokable.class);
        vertex4.setParallelism(8);
        vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        vertex3.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex4.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        vertex4.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex1, vertex2, vertex3, vertex4});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        Iterator evs = eg.getAllExecutionVertices().iterator();
        FailoverRegion preRegion = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
        while (evs.hasNext()) {
            FailoverRegion region = failoverStrategy.getFailoverRegion((ExecutionVertex)evs.next());
            Assert.assertTrue((preRegion == region ? 1 : 0) != 0);
        }
    }

    @Test
    public void testBlockingAllToAllTopologyWithCoLocation() throws Exception {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(10);
        JobVertex target = new JobVertex("target");
        target.setInvokableClass(NoOpInvokable.class);
        target.setParallelism(13);
        target.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        source.setSlotSharingGroup(sharingGroup);
        target.setSlotSharingGroup(sharingGroup);
        source.setStrictlyCoLocatedWith(target);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{source, target});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]);
        FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]);
        Assert.assertTrue((region1 == region2 ? 1 : 0) != 0);
    }

    @Test
    public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(10);
        JobVertex target = new JobVertex("target");
        target.setInvokableClass(NoOpInvokable.class);
        target.setParallelism(10);
        target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        source.setSlotSharingGroup(sharingGroup);
        target.setSlotSharingGroup(sharingGroup);
        source.setStrictlyCoLocatedWith(target);
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{source, target});
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
        FailoverRegion sourceRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]);
        FailoverRegion sourceRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[1]);
        FailoverRegion targetRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]);
        FailoverRegion targetRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[1]);
        Assert.assertTrue((sourceRegion1 == sourceRegion2 ? 1 : 0) != 0);
        Assert.assertTrue((sourceRegion2 == targetRegion1 ? 1 : 0) != 0);
        Assert.assertTrue((targetRegion1 == targetRegion2 ? 1 : 0) != 0);
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobException, JobExecutionException {
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        return ExecutionGraphBuilder.buildGraph(null, (JobGraph)jobGraph, (Configuration)jobManagerConfig, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (SlotProvider)((SlotProvider)Mockito.mock(SlotProvider.class)), (ClassLoader)PipelinedFailoverRegionBuildingTest.class.getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)Time.seconds((long)10L), (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (int)1000, (Logger)this.log);
    }
}

