/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.dataexchange;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnionClosedBranchingTest
extends CompilerTestBase {
    private final ExecutionMode executionMode;
    private final DataExchangeMode sourceToUnion;
    private final DataExchangeMode unionToJoin;
    private final ShipStrategyType sourceToUnionStrategy = ShipStrategyType.PARTITION_HASH;
    private final ShipStrategyType unionToJoinStrategy = ShipStrategyType.FORWARD;

    @Parameterized.Parameters
    public static Collection<Object[]> params() {
        List<Object[]> params = Arrays.asList({ExecutionMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED}, {ExecutionMode.PIPELINED_FORCED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED}, {ExecutionMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED}, {ExecutionMode.BATCH_FORCED, DataExchangeMode.BATCH, DataExchangeMode.BATCH});
        Assert.assertEquals((long)ExecutionMode.values().length, (long)params.size());
        return params;
    }

    public UnionClosedBranchingTest(ExecutionMode executionMode, DataExchangeMode sourceToUnion, DataExchangeMode unionToJoin) {
        this.executionMode = executionMode;
        this.sourceToUnion = sourceToUnion;
        this.unionToJoin = unionToJoin;
    }

    @Test
    public void testUnionClosedBranchingTest() throws Exception {
        JobVertex[] sources;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setExecutionMode(this.executionMode);
        env.setParallelism(4);
        DataSource src1 = env.fromElements((Object[])new Tuple1[]{new Tuple1((Object)0), new Tuple1((Object)1)});
        DataSource src2 = env.fromElements((Object[])new Tuple1[]{new Tuple1((Object)0), new Tuple1((Object)1)});
        UnionOperator union = src1.union((DataSet)src2);
        JoinOperator.ProjectJoin join = union.join((DataSet)union).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0}).projectSecond(new int[]{0});
        join.output((OutputFormat)new DiscardingOutputFormat());
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        SinkPlanNode sinkNode = (SinkPlanNode)optimizedPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        for (Channel channel : joinNode.getInputs()) {
            Assert.assertEquals((String)"Unexpected data exchange mode between union and join node.", (Object)this.unionToJoin, (Object)channel.getDataExchangeMode());
            Assert.assertEquals((String)"Unexpected ship strategy between union and join node.", (Object)this.unionToJoinStrategy, (Object)channel.getShipStrategy());
        }
        for (SourcePlanNode src : optimizedPlan.getDataSources()) {
            for (Channel channel : src.getOutgoingChannels()) {
                Assert.assertEquals((String)"Unexpected data exchange mode between source and union node.", (Object)this.sourceToUnion, (Object)channel.getDataExchangeMode());
                Assert.assertEquals((String)"Unexpected ship strategy between source and union node.", (Object)this.sourceToUnionStrategy, (Object)channel.getShipStrategy());
            }
        }
        JobGraphGenerator jgg = new JobGraphGenerator();
        JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan);
        List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assert.assertEquals((String)"Unexpected number of vertices created.", (long)4L, (long)vertices.size());
        for (JobVertex src : sources = new JobVertex[]{(JobVertex)vertices.get(0), (JobVertex)vertices.get(1)}) {
            Assert.assertTrue((String)"Unexpected vertex type. Test setup is broken.", (boolean)src.isInputVertex());
            Assert.assertEquals((String)"Unexpected number of created results.", (long)2L, (long)src.getNumberOfProducedIntermediateDataSets());
            for (IntermediateDataSet dataSet : src.getProducedDataSets()) {
                ResultPartitionType dsType = dataSet.getResultType();
                if (!this.executionMode.equals((Object)ExecutionMode.PIPELINED_FORCED)) {
                    Assert.assertTrue((String)("Expected batch exchange, but result type is " + dsType + "."), (boolean)dsType.isBlockingOrBlockingPersistentResultPartition());
                    continue;
                }
                Assert.assertFalse((String)("Expected non-batch exchange, but result type is " + dsType + "."), (boolean)dsType.isBlockingOrBlockingPersistentResultPartition());
            }
        }
    }
}

