/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.ReplicatingInputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class ReplicatingDataSourceTest
extends CompilerTestBase {
    @Test
    public void checkJoinWithReplicatedSourceInput() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMap() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.map(new IdMap()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindFilter() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.filter(new NoFilter()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.flatMap(new IdFlatMap()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.mapPartition(new IdPMap()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.filter(new NoFilter()).mapPartition(new IdPMap()).flatMap(new IdFlatMap()).map(new IdMap()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode joinNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
        ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)joinIn2);
    }

    @Test
    public void checkCrossWithReplicatedSourceInput() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.cross((DataSet)source2).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode crossNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
        ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)crossIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)crossIn2);
    }

    @Test
    public void checkCrossWithReplicatedSourceInputBehindMap() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.map(new IdMap()).filter(new NoFilter()).cross((DataSet)source2).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode crossNode = (DualInputPlanNode)sinkNode.getPredecessor();
        ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
        ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)crossIn1);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)crossIn2);
    }

    @Test(expected=CompilerException.class)
    public void checkJoinWithReplicatedSourceInputChangingparallelism() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = ((JoinOperator)source1.join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).setParallelism(10)).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
    }

    @Test(expected=CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.map(new IdMap()).setParallelism(9).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
    }

    @Test(expected=CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindReduce() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.reduce(new LastReduce()).join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
    }

    @Test(expected=CompilerException.class)
    public void checkJoinWithReplicatedSourceInputBehindRebalance() {
        LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        TupleTypeInfo typeInfo = TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class});
        ReplicatingInputFormat rif = new ReplicatingInputFormat((InputFormat)new TupleCsvInputFormat(new Path("/some/path"), (TupleTypeInfoBase)typeInfo));
        DataSource source1 = env.createInput((InputFormat)rif, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}));
        DataSource source2 = env.readCsvFile("/some/otherpath").types(String.class);
        DataSink out = source1.rebalance().join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).writeAsText("/some/newpath");
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats(plan);
    }

    public static class LastReduce<T>
    implements ReduceFunction<T> {
        public T reduce(T value1, T value2) throws Exception {
            return value2;
        }
    }

    public static class IdPMap<T>
    implements MapPartitionFunction<T, T> {
        public void mapPartition(Iterable<T> values, Collector<T> out) throws Exception {
            for (T v : values) {
                out.collect(v);
            }
        }
    }

    public static class IdFlatMap<T>
    implements FlatMapFunction<T, T> {
        public void flatMap(T value, Collector<T> out) throws Exception {
            out.collect(value);
        }
    }

    public static class NoFilter<T>
    implements FilterFunction<T> {
        public boolean filter(T value) throws Exception {
            return false;
        }
    }

    public static class IdMap<T>
    implements MapFunction<T, T> {
        public T map(T value) throws Exception {
            return value;
        }
    }
}

