/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class UnionReplacementTest
extends CompilerTestBase {
    @Test
    public void testUnionReplacement() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new String[]{"test1"});
            DataSource input2 = env.fromElements((Object[])new String[]{"test2"});
            UnionOperator union = input1.union((DataSet)input2);
            union.output((OutputFormat)new DiscardingOutputFormat());
            union.output((OutputFormat)new DiscardingOutputFormat());
            Plan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats(plan);
            JobGraphGenerator jobGen = new JobGraphGenerator();
            jobGen.compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testUnionWithTwoOutputs() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src3 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src4 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        UnionOperator union23 = src2.union((DataSet)src3);
        UnionOperator union123 = src1.union((DataSet)union23);
        UnionOperator union234 = src4.union((DataSet)union23);
        ((AggregateOperator)union123.groupBy(new int[]{0}).sum(1).name("1")).output((OutputFormat)new DiscardingOutputFormat());
        ((AggregateOperator)union234.groupBy(new int[]{1}).sum(0).name("2")).output((OutputFormat)new DiscardingOutputFormat());
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode groupRed1 = (SingleInputPlanNode)resolver.getNode("1");
        SingleInputPlanNode groupRed2 = (SingleInputPlanNode)resolver.getNode("2");
        Assert.assertTrue((String)"Reduce input should be partitioned on 0.", (boolean)groupRed1.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(0)));
        Assert.assertTrue((String)"Reduce input should be partitioned on 1.", (boolean)groupRed2.getInput().getGlobalProperties().getPartitioningFields().isExactMatch(new FieldList(1)));
        Assert.assertTrue((String)"Reduce input should be n-ary union with three inputs.", (groupRed1.getInput().getSource() instanceof NAryUnionPlanNode && ((NAryUnionPlanNode)groupRed1.getInput().getSource()).getListOfInputs().size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Reduce input should be n-ary union with three inputs.", (groupRed2.getInput().getSource() instanceof NAryUnionPlanNode && ((NAryUnionPlanNode)groupRed2.getInput().getSource()).getListOfInputs().size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((String)"Channel between union and group reduce should be forwarding", (boolean)groupRed1.getInput().getShipStrategy().equals((Object)ShipStrategyType.FORWARD));
        Assert.assertTrue((String)"Channel between union and group reduce should be forwarding", (boolean)groupRed2.getInput().getShipStrategy().equals((Object)ShipStrategyType.FORWARD));
        List union123In = ((NAryUnionPlanNode)groupRed1.getInput().getSource()).getListOfInputs();
        for (Channel i : union123In) {
            Assert.assertTrue((String)"Union input channel should hash partition on 0", (i.getShipStrategy().equals((Object)ShipStrategyType.PARTITION_HASH) && i.getShipStrategyKeys().isExactMatch(new FieldList(0)) ? 1 : 0) != 0);
        }
        List union234In = ((NAryUnionPlanNode)groupRed2.getInput().getSource()).getListOfInputs();
        for (Channel i : union234In) {
            Assert.assertTrue((String)"Union input channel should hash partition on 0", (i.getShipStrategy().equals((Object)ShipStrategyType.PARTITION_HASH) && i.getShipStrategyKeys().isExactMatch(new FieldList(1)) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testConsecutiveUnionsWithHashPartitioning() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src3 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        UnionOperator union12 = src1.union((DataSet)src2);
        UnionOperator union123 = union12.union((DataSet)src3);
        union123.partitionByHash(new int[]{1}).output((OutputFormat)new DiscardingOutputFormat()).name("out");
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode sink = (SingleInputPlanNode)resolver.getNode("out");
        Assert.assertEquals((String)"Sink input should be hash partitioned.", (Object)PartitioningProperty.HASH_PARTITIONED, (Object)sink.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals((String)"Sink input should be hash partitioned on 1.", (Object)new FieldList(1), (Object)sink.getInput().getGlobalProperties().getPartitioningFields());
        SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
        Assert.assertTrue((partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP ? 1 : 0) != 0);
        Assert.assertEquals((String)"Partitioner input should be hash partitioned.", (Object)PartitioningProperty.HASH_PARTITIONED, (Object)partitioner.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals((String)"Partitioner input should be hash partitioned on 1.", (Object)new FieldList(1), (Object)partitioner.getInput().getGlobalProperties().getPartitioningFields());
        Assert.assertEquals((String)"Partitioner input channel should be forwarding", (Object)ShipStrategyType.FORWARD, (Object)partitioner.getInput().getShipStrategy());
        NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
        for (Channel c : union.getInputs()) {
            Assert.assertEquals((String)"Union input should be hash partitioned", (Object)PartitioningProperty.HASH_PARTITIONED, (Object)c.getGlobalProperties().getPartitioning());
            Assert.assertEquals((String)"Union input channel should be hash partitioning", (Object)ShipStrategyType.PARTITION_HASH, (Object)c.getShipStrategy());
            Assert.assertTrue((String)"Union input should be data source", (boolean)(c.getSource() instanceof SourcePlanNode));
        }
    }

    @Test
    public void testConsecutiveUnionsWithRebalance() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src3 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        UnionOperator union12 = src1.union((DataSet)src2);
        UnionOperator union123 = union12.union((DataSet)src3);
        union123.rebalance().output((OutputFormat)new DiscardingOutputFormat()).name("out");
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode sink = (SingleInputPlanNode)resolver.getNode("out");
        Assert.assertEquals((String)"Sink input should be force rebalanced.", (Object)PartitioningProperty.FORCED_REBALANCED, (Object)sink.getInput().getGlobalProperties().getPartitioning());
        SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
        Assert.assertTrue((partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP ? 1 : 0) != 0);
        Assert.assertEquals((String)"Partitioner input should be force rebalanced.", (Object)PartitioningProperty.FORCED_REBALANCED, (Object)partitioner.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals((String)"Partitioner input channel should be forwarding", (Object)ShipStrategyType.FORWARD, (Object)partitioner.getInput().getShipStrategy());
        NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
        for (Channel c : union.getInputs()) {
            Assert.assertEquals((String)"Union input should be force rebalanced", (Object)PartitioningProperty.FORCED_REBALANCED, (Object)c.getGlobalProperties().getPartitioning());
            Assert.assertEquals((String)"Union input channel should be rebalancing", (Object)ShipStrategyType.PARTITION_FORCED_REBALANCE, (Object)c.getShipStrategy());
            Assert.assertTrue((String)"Union input should be data source", (boolean)(c.getSource() instanceof SourcePlanNode));
        }
    }

    @Test
    public void testConsecutiveUnionsWithRangePartitioning() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src3 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        UnionOperator union12 = src1.union((DataSet)src2);
        UnionOperator union123 = union12.union((DataSet)src3);
        union123.partitionByRange(new int[]{1}).output((OutputFormat)new DiscardingOutputFormat()).name("out");
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode sink = (SingleInputPlanNode)resolver.getNode("out");
        Assert.assertEquals((String)"Sink input should be range partitioned.", (Object)PartitioningProperty.RANGE_PARTITIONED, (Object)sink.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals((String)"Sink input should be range partitioned on 1", (Object)new Ordering(1, null, Order.ASCENDING), (Object)sink.getInput().getGlobalProperties().getPartitioningOrdering());
        SingleInputPlanNode partitioner = (SingleInputPlanNode)sink.getInput().getSource();
        Assert.assertTrue((partitioner.getDriverStrategy() == DriverStrategy.UNARY_NO_OP ? 1 : 0) != 0);
        Assert.assertEquals((String)"Partitioner input should be range partitioned.", (Object)PartitioningProperty.RANGE_PARTITIONED, (Object)partitioner.getInput().getGlobalProperties().getPartitioning());
        Assert.assertEquals((String)"Partitioner input should be range partitioned on 1", (Object)new Ordering(1, null, Order.ASCENDING), (Object)partitioner.getInput().getGlobalProperties().getPartitioningOrdering());
        Assert.assertEquals((String)"Partitioner input channel should be forwarding", (Object)ShipStrategyType.FORWARD, (Object)partitioner.getInput().getShipStrategy());
        NAryUnionPlanNode union = (NAryUnionPlanNode)partitioner.getInput().getSource();
        for (Channel c : union.getInputs()) {
            Assert.assertEquals((String)"Union input should be range partitioned", (Object)PartitioningProperty.RANGE_PARTITIONED, (Object)c.getGlobalProperties().getPartitioning());
            Assert.assertEquals((String)"Union input channel should be forwarded", (Object)ShipStrategyType.FORWARD, (Object)c.getShipStrategy());
            SingleInputPlanNode partitionMap = (SingleInputPlanNode)c.getSource();
            Assert.assertEquals((Object)DriverStrategy.MAP, (Object)partitionMap.getDriverStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)partitionMap.getInput().getShipStrategy());
        }
    }

    @Test
    public void testConsecutiveUnionsWithBroadcast() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src2 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src3 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataSource src4 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        UnionOperator union12 = src1.union((DataSet)src2);
        UnionOperator union123 = union12.union((DataSet)src3);
        ((JoinOperator)union123.join((DataSet)src4, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{0}).name("join")).output((OutputFormat)new DiscardingOutputFormat()).name("out");
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        DualInputPlanNode join = (DualInputPlanNode)resolver.getNode("join");
        Assert.assertEquals((String)"First join input should be fully replicated.", (Object)PartitioningProperty.FULL_REPLICATION, (Object)join.getInput1().getGlobalProperties().getPartitioning());
        NAryUnionPlanNode union = (NAryUnionPlanNode)join.getInput1().getSource();
        for (Channel c : union.getInputs()) {
            Assert.assertEquals((String)"Union input should be fully replicated", (Object)PartitioningProperty.FULL_REPLICATION, (Object)c.getGlobalProperties().getPartitioning());
            Assert.assertEquals((String)"Union input channel should be broadcasting", (Object)ShipStrategyType.BROADCAST, (Object)c.getShipStrategy());
        }
    }

    @Test
    public void testUnionForwardOutput() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource src1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        MapOperator u1 = src1.union((DataSet)src1).map(new IdentityMapper());
        Operator s1 = u1.filter((FilterFunction & Serializable)x -> true).name("preFilter1");
        Operator s2 = u1.filter((FilterFunction & Serializable)x -> true).name("preFilter2");
        Operator reduced1 = ((FilterOperator)s1.union((DataSet)s2).filter((FilterFunction & Serializable)x -> true).name("postFilter1")).groupBy(new int[]{0}).reduceGroup(new IdentityGroupReducer()).name("reducer1");
        Operator reduced2 = ((FilterOperator)s1.union((DataSet)s2).filter((FilterFunction & Serializable)x -> true).name("postFilter2")).groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducer()).name("reducer2");
        reduced1.union((DataSet)reduced2).output((OutputFormat)new DiscardingOutputFormat());
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode unionOut1 = (SingleInputPlanNode)resolver.getNode("postFilter1");
        SingleInputPlanNode unionOut2 = (SingleInputPlanNode)resolver.getNode("postFilter2");
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)unionOut1.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)unionOut2.getInput().getShipStrategy());
    }

    @Test
    public void testUnionInputOutputDifferentDOP() throws Exception {
        Channel halfHalf;
        Channel halfFull;
        Channel fullHalf;
        Channel fullFull;
        int fullDop = 8;
        int halfDop = 4;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator in1 = ((MapOperator)env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)}).map(new IdentityMapper()).setParallelism(fullDop)).name("inDopFull");
        Operator in2 = ((MapOperator)env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)}).map(new IdentityMapper()).setParallelism(halfDop)).name("inDopHalf");
        UnionOperator union = in1.union((DataSet)in2);
        Operator dopFullMap = ((MapOperator)union.map(new IdentityMapper()).setParallelism(fullDop)).name("outDopFull");
        Operator dopHalfMap = ((MapOperator)union.map(new IdentityMapper()).setParallelism(halfDop)).name("outDopHalf");
        dopFullMap.output((OutputFormat)new DiscardingOutputFormat());
        dopHalfMap.output((OutputFormat)new DiscardingOutputFormat());
        OptimizedPlan optimizedPlan = this.compileNoStats(env.createProgramPlan());
        CompilerTestBase.OptimizerPlanNodeResolver resolver = UnionReplacementTest.getOptimizerPlanNodeResolver(optimizedPlan);
        SingleInputPlanNode inDopFull = (SingleInputPlanNode)resolver.getNode("inDopFull");
        SingleInputPlanNode inDopHalf = (SingleInputPlanNode)resolver.getNode("inDopHalf");
        SingleInputPlanNode outDopFull = (SingleInputPlanNode)resolver.getNode("outDopFull");
        SingleInputPlanNode outDopHalf = (SingleInputPlanNode)resolver.getNode("outDopHalf");
        NAryUnionPlanNode unionDopFull = (NAryUnionPlanNode)outDopFull.getInput().getSource();
        NAryUnionPlanNode unionDopHalf = (NAryUnionPlanNode)outDopHalf.getInput().getSource();
        Assert.assertEquals((long)2L, (long)inDopFull.getOutgoingChannels().size());
        Assert.assertEquals((long)2L, (long)inDopHalf.getOutgoingChannels().size());
        Assert.assertEquals((long)fullDop, (long)inDopFull.getParallelism());
        Assert.assertEquals((long)halfDop, (long)inDopHalf.getParallelism());
        Assert.assertEquals((long)fullDop, (long)unionDopFull.getParallelism());
        Assert.assertEquals((long)halfDop, (long)unionDopHalf.getParallelism());
        Assert.assertEquals((long)fullDop, (long)outDopFull.getParallelism());
        Assert.assertEquals((long)halfDop, (long)outDopHalf.getParallelism());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)outDopHalf.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)outDopFull.getInput().getShipStrategy());
        if (((Channel)inDopFull.getOutgoingChannels().get(0)).getTarget() == unionDopFull) {
            fullFull = (Channel)inDopFull.getOutgoingChannels().get(0);
            fullHalf = (Channel)inDopFull.getOutgoingChannels().get(1);
        } else {
            fullFull = (Channel)inDopFull.getOutgoingChannels().get(1);
            fullHalf = (Channel)inDopFull.getOutgoingChannels().get(0);
        }
        if (((Channel)inDopHalf.getOutgoingChannels().get(0)).getTarget() == unionDopFull) {
            halfFull = (Channel)inDopHalf.getOutgoingChannels().get(0);
            halfHalf = (Channel)inDopHalf.getOutgoingChannels().get(1);
        } else {
            halfFull = (Channel)inDopHalf.getOutgoingChannels().get(1);
            halfHalf = (Channel)inDopHalf.getOutgoingChannels().get(0);
        }
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)fullFull.getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)halfHalf.getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_RANDOM, (Object)fullHalf.getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_RANDOM, (Object)halfFull.getShipStrategy());
    }
}

