/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adapter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import junit.framework.TestCase;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionVertex;
import org.apache.flink.runtime.scheduler.adapter.DefaultResultPartition;
import org.apache.flink.runtime.scheduler.adapter.DefaultSchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultExecutionTopologyTest
extends TestLogger {
    private DefaultExecutionGraph executionGraph;
    private DefaultExecutionTopology adapter;

    @Before
    public void setUp() throws Exception {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
    }

    @Test
    public void testConstructor() {
        DefaultExecutionTopologyTest.assertGraphEquals((ExecutionGraph)this.executionGraph, this.adapter);
    }

    @Test
    public void testGetResultPartition() {
        for (ExecutionVertex vertex : this.executionGraph.getAllExecutionVertices()) {
            for (Map.Entry entry : vertex.getProducedPartitions().entrySet()) {
                IntermediateResultPartition partition = (IntermediateResultPartition)entry.getValue();
                DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition((IntermediateResultPartitionID)entry.getKey());
                DefaultExecutionTopologyTest.assertPartitionEquals(partition, schedulingResultPartition);
            }
        }
    }

    @Test
    public void testResultPartitionStateSupplier() {
        IntermediateResultPartition intermediateResultPartition = (IntermediateResultPartition)IterableUtils.toStream((Iterable)this.executionGraph.getAllExecutionVertices()).flatMap(v -> v.getProducedPartitions().values().stream()).findAny().get();
        DefaultResultPartition schedulingResultPartition = this.adapter.getResultPartition(intermediateResultPartition.getPartitionId());
        Assert.assertEquals((Object)ResultPartitionState.CREATED, (Object)schedulingResultPartition.getState());
        intermediateResultPartition.markDataProduced();
        Assert.assertEquals((Object)ResultPartitionState.CONSUMABLE, (Object)schedulingResultPartition.getState());
    }

    @Test
    public void testGetVertexOrThrow() {
        try {
            this.adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0));
            Assert.fail((String)"get not exist vertex");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testResultPartitionOrThrow() {
        try {
            this.adapter.getResultPartition(new IntermediateResultPartitionID());
            Assert.fail((String)"get not exist result partition");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testGetAllPipelinedRegions() {
        Iterable allPipelinedRegions = this.adapter.getAllPipelinedRegions();
        Assert.assertEquals((long)1L, (long)Iterables.size((Iterable)allPipelinedRegions));
    }

    @Test
    public void testGetPipelinedRegionOfVertex() {
        for (DefaultExecutionVertex vertex : this.adapter.getVertices()) {
            DefaultSchedulingPipelinedRegion pipelinedRegion = this.adapter.getPipelinedRegionOfVertex(vertex.getId());
            this.assertRegionContainsAllVertices(pipelinedRegion);
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testErrorIfCoLocatedTasksAreNotInSameRegion() throws Exception {
        int parallelism = 3;
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex(parallelism);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        v1.setSlotSharingGroup(slotSharingGroup);
        v2.setSlotSharingGroup(slotSharingGroup);
        v1.setStrictlyCoLocatedWith(v2);
        DefaultExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(v1, v2);
        DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)executionGraph);
    }

    @Test
    public void testUpdateTopology() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        MatcherAssert.assertThat((Object)IterableUtils.toStream((Iterable)this.adapter.getVertices()).count(), (Matcher)Is.is((Object)3L));
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2));
        MatcherAssert.assertThat((Object)IterableUtils.toStream((Iterable)this.adapter.getVertices()).count(), (Matcher)Is.is((Object)6L));
        DefaultExecutionTopologyTest.assertGraphEquals((ExecutionGraph)this.executionGraph, this.adapter);
    }

    @Test(expected=IllegalStateException.class)
    public void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.PIPELINED);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2));
    }

    @Test
    public void testExistingRegionsAreNotAffectedDuringTopologyUpdate() throws Exception {
        JobVertex[] jobVertices = this.createJobVertices(ResultPartitionType.BLOCKING);
        this.executionGraph = this.createDynamicGraph(jobVertices);
        this.adapter = DefaultExecutionTopology.fromExecutionGraph((DefaultExecutionGraph)this.executionGraph);
        ExecutionJobVertex ejv1 = this.executionGraph.getJobVertex(jobVertices[0].getID());
        ExecutionJobVertex ejv2 = this.executionGraph.getJobVertex(jobVertices[1].getID());
        this.executionGraph.initializeJobVertex(ejv1, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv1));
        DefaultSchedulingPipelinedRegion regionOld = this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
        this.executionGraph.initializeJobVertex(ejv2, 0L);
        this.adapter.notifyExecutionGraphUpdated(this.executionGraph, Collections.singletonList(ejv2));
        DefaultSchedulingPipelinedRegion regionNew = this.adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0));
        TestCase.assertSame((Object)regionOld, (Object)regionNew);
    }

    private JobVertex[] createJobVertices(ResultPartitionType resultPartitionType) {
        JobVertex[] jobVertices = new JobVertex[2];
        int parallelism = 3;
        jobVertices[0] = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertices[1] = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertices[1].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, resultPartitionType);
        return jobVertices;
    }

    private DefaultExecutionGraph createDynamicGraph(JobVertex ... jobVertices) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertices)).buildDynamicGraph();
    }

    private void assertRegionContainsAllVertices(DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) {
        HashSet allVertices = Sets.newHashSet((Iterable)pipelinedRegionOfVertex.getVertices());
        Assert.assertEquals((Object)Sets.newHashSet((Iterable)this.adapter.getVertices()), (Object)allVertices);
    }

    private static void assertGraphEquals(ExecutionGraph originalGraph, DefaultExecutionTopology adaptedTopology) {
        Iterator originalVertices = originalGraph.getAllExecutionVertices().iterator();
        Iterator adaptedVertices = adaptedTopology.getVertices().iterator();
        while (originalVertices.hasNext()) {
            ExecutionVertex originalVertex = (ExecutionVertex)originalVertices.next();
            DefaultExecutionVertex adaptedVertex = (DefaultExecutionVertex)adaptedVertices.next();
            Assert.assertEquals((Object)originalVertex.getID(), (Object)adaptedVertex.getId());
            ArrayList<IntermediateResultPartition> originalConsumedPartitions = new ArrayList<IntermediateResultPartition>();
            for (ConsumedPartitionGroup consumedPartitionGroup : originalVertex.getAllConsumedPartitionGroups()) {
                for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
                    IntermediateResultPartition partition = originalVertex.getExecutionGraphAccessor().getResultPartitionOrThrow(partitionId);
                    originalConsumedPartitions.add(partition);
                }
            }
            Iterable adaptedConsumedPartitions = adaptedVertex.getConsumedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalConsumedPartitions, adaptedConsumedPartitions);
            Collection<IntermediateResultPartition> originalProducedPartitions = originalVertex.getProducedPartitions().values();
            Iterable adaptedProducedPartitions = adaptedVertex.getProducedResults();
            DefaultExecutionTopologyTest.assertPartitionsEquals(originalProducedPartitions, adaptedProducedPartitions);
        }
        Assert.assertFalse((String)"Number of adapted vertices exceeds number of original vertices.", (boolean)adaptedVertices.hasNext());
    }

    private static void assertPartitionsEquals(Iterable<IntermediateResultPartition> originalResultPartitions, Iterable<DefaultResultPartition> adaptedResultPartitions) {
        Assert.assertEquals((long)Iterables.size(originalResultPartitions), (long)Iterables.size(adaptedResultPartitions));
        for (IntermediateResultPartition originalPartition : originalResultPartitions) {
            DefaultResultPartition adaptedPartition = IterableUtils.toStream(adaptedResultPartitions).filter(adapted -> adapted.getId().equals((Object)originalPartition.getPartitionId())).findAny().orElseThrow(() -> new AssertionError((Object)("Could not find matching adapted partition for " + originalPartition)));
            DefaultExecutionTopologyTest.assertPartitionEquals(originalPartition, adaptedPartition);
            ConsumerVertexGroup consumerVertexGroup = originalPartition.getConsumerVertexGroup();
            Optional adaptedConsumers = adaptedPartition.getConsumerVertexGroup();
            TestCase.assertTrue((boolean)adaptedConsumers.isPresent());
            for (ExecutionVertexID originalId : consumerVertexGroup) {
                TestCase.assertTrue((boolean)IterableUtils.toStream((Iterable)((Iterable)adaptedConsumers.get())).anyMatch(adaptedConsumer -> adaptedConsumer.equals((Object)originalId)));
            }
        }
    }

    private static void assertPartitionEquals(IntermediateResultPartition originalPartition, DefaultResultPartition adaptedPartition) {
        Assert.assertEquals((Object)originalPartition.getPartitionId(), (Object)adaptedPartition.getId());
        Assert.assertEquals((Object)originalPartition.getIntermediateResult().getId(), (Object)adaptedPartition.getResultId());
        Assert.assertEquals((Object)originalPartition.getResultType(), (Object)adaptedPartition.getResultType());
        Assert.assertEquals((Object)originalPartition.getProducer().getID(), (Object)adaptedPartition.getProducer().getId());
    }
}

