/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

public class SemanticPropertiesAPIToPlanTest
extends CompilerTestBase {
    @Test
    public void forwardFieldsTestMapReduce() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        set = ((MapOperator)((ReduceOperator)((MapOperator)set.map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"*"})).groupBy(new int[]{0}).reduce((ReduceFunction)new MockReducer()).withForwardedFields(new String[]{"f0->f1"})).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"*"})).groupBy(new int[]{1}).reduce((ReduceFunction)new MockReducer()).withForwardedFields(new String[]{"*"});
        set.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                LocalProperties lprops;
                GlobalProperties gprops;
                if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperatorBase) {
                    for (Channel input : visitable.getInputs()) {
                        gprops = visitable.getGlobalProperties();
                        lprops = visitable.getLocalProperties();
                        Assert.assertTrue((String)"Reduce should just forward the input if it is already partitioned", (input.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                        Assert.assertTrue((String)"Wrong GlobalProperties on Reducer", (boolean)gprops.isPartitionedOnFields(new FieldSet(Integer.valueOf(1))));
                        Assert.assertTrue((String)"Wrong GlobalProperties on Reducer", (gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED ? 1 : 0) != 0);
                        Assert.assertTrue((String)"Wrong LocalProperties on Reducer", (boolean)lprops.getGroupedFields().contains(Integer.valueOf(1)));
                    }
                }
                if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof MapOperatorBase) {
                    for (Channel input : visitable.getInputs()) {
                        gprops = visitable.getGlobalProperties();
                        lprops = visitable.getLocalProperties();
                        Assert.assertTrue((String)"Map should just forward the input if it is already partitioned", (input.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                        Assert.assertTrue((String)"Wrong GlobalProperties on Mapper", (boolean)gprops.isPartitionedOnFields(new FieldSet(Integer.valueOf(1))));
                        Assert.assertTrue((String)"Wrong GlobalProperties on Mapper", (gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED ? 1 : 0) != 0);
                        Assert.assertTrue((String)"Wrong LocalProperties on Mapper", (boolean)lprops.getGroupedFields().contains(Integer.valueOf(1)));
                    }
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }

    @Test
    public void forwardFieldsTestJoin() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource in1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource in2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        in1 = ((MapOperator)in1.map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"*"})).groupBy(new int[]{0}).reduce((ReduceFunction)new MockReducer()).withForwardedFields(new String[]{"f0->f1"});
        in2 = ((MapOperator)in2.map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"*"})).groupBy(new int[]{1}).reduce((ReduceFunction)new MockReducer()).withForwardedFields(new String[]{"f1->f2"});
        JoinOperator.EquiJoin out = in1.join((DataSet)in2).where(new int[]{1}).equalTo(new int[]{2}).with((JoinFunction)new MockJoin());
        out.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof InnerJoinOperatorBase) {
                    DualInputPlanNode node = (DualInputPlanNode)visitable;
                    Channel inConn1 = node.getInput1();
                    Channel inConn2 = node.getInput2();
                    Assert.assertTrue((String)"Join should just forward the input if it is already partitioned", (inConn1.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                    Assert.assertTrue((String)"Join should just forward the input if it is already partitioned", (inConn2.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }

    public static class MockJoin
    implements JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
            return null;
        }
    }

    public static class MockReducer
    implements ReduceFunction<Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws Exception {
            return null;
        }
    }

    public static class MockMapper
    implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
            return null;
        }
    }
}

