/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class PartitioningReusageTest
extends CompilerTestBase {
    @Test
    public void noPreviousPartitioningJoin1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = set1.join((DataSet)set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void noPreviousPartitioningJoin2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = set1.join((DataSet)set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseSinglePartitioningJoin1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).join((DataSet)set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseSinglePartitioningJoin2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).join((DataSet)set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseSinglePartitioningJoin3() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = set1.join((DataSet)set2.partitionByHash(new int[]{2, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseSinglePartitioningJoin4() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0"})).join((DataSet)set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseSinglePartitioningJoin5() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = set1.join((DataSet)set2.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).join((DataSet)set2.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).join((DataSet)set2.partitionByHash(new int[]{1, 2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1;2"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin3() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0"})).join((DataSet)set2.partitionByHash(new int[]{2, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin4() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0, 2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;2"})).join((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin5() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).join((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin6() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{0}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0"})).join((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void reuseBothPartitioningJoin7() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        JoinOperator.EquiJoin joined = ((MapOperator)set1.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).join((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with((JoinFunction)new MockJoin());
        joined.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidJoinInputProperties(join);
    }

    @Test
    public void noPreviousPartitioningCoGroup1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = set1.coGroup((DataSet)set2).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void noPreviousPartitioningCoGroup2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = set1.coGroup((DataSet)set2).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseSinglePartitioningCoGroup1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).coGroup((DataSet)set2).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseSinglePartitioningCoGroup2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).coGroup((DataSet)set2).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseSinglePartitioningCoGroup3() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = set1.coGroup((DataSet)set2.partitionByHash(new int[]{2, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2;1"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseSinglePartitioningCoGroup4() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0"})).coGroup((DataSet)set2).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseSinglePartitioningCoGroup5() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = set1.coGroup((DataSet)set2.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).coGroup((DataSet)set2.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup2() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;1"})).coGroup((DataSet)set2.partitionByHash(new int[]{1, 2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1;2"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup3() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0"})).coGroup((DataSet)set2.partitionByHash(new int[]{2, 1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2;1"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup4() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{0, 2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"0;2"})).coGroup((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup5() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).coGroup((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup6() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).coGroup((DataSet)set2.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    @Test
    public void reuseBothPartitioningCoGroup7() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        DataSource set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
        CoGroupOperator coGrouped = ((MapOperator)set1.partitionByHash(new int[]{2}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"2"})).coGroup((DataSet)set2.partitionByHash(new int[]{1}).map((MapFunction)new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with((CoGroupFunction)new MockCoGroup());
        coGrouped.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileWithStats(plan);
        SinkPlanNode sink = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
        this.checkValidCoGroupInputProperties(coGroup);
    }

    private void checkValidJoinInputProperties(DualInputPlanNode join) {
        GlobalProperties inProps1 = join.getInput1().getGlobalProperties();
        GlobalProperties inProps2 = join.getInput2().getGlobalProperties();
        if (inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
            FieldList pFields1 = inProps1.getPartitioningFields();
            FieldList pFields2 = inProps2.getPartitioningFields();
            Assert.assertTrue((String)("Inputs are not the same number of fields. Input 1: " + pFields1 + ", Input 2: " + pFields2), (pFields1.size() == pFields2.size() ? 1 : 0) != 0);
            FieldList reqPFields1 = join.getKeysForInput1();
            FieldList reqPFields2 = join.getKeysForInput2();
            for (int i = 0; i < pFields1.size(); ++i) {
                int f1 = pFields1.get(i);
                int f2 = pFields2.get(i);
                int pos1 = this.getPosInFieldList(f1, reqPFields1);
                int pos2 = this.getPosInFieldList(f2, reqPFields2);
                if (pos1 < 0) {
                    Assert.fail((String)("Input 1 is partitioned on field " + f1 + " which is not contained in the key set " + reqPFields1));
                }
                if (pos2 < 0) {
                    Assert.fail((String)("Input 2 is partitioned on field " + f2 + " which is not contained in the key set " + reqPFields2));
                }
                if (pos1 == pos2) continue;
                Assert.fail((String)"Inputs are not partitioned on the same key fields");
            }
        } else if (!(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION && inProps2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED || inProps1.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED && inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION)) {
            throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
        }
    }

    private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) {
        GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties();
        GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties();
        if (inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
            FieldList pFields1 = inProps1.getPartitioningFields();
            FieldList pFields2 = inProps2.getPartitioningFields();
            Assert.assertTrue((String)("Inputs are not the same number of fields. Input 1: " + pFields1 + ", Input 2: " + pFields2), (pFields1.size() == pFields2.size() ? 1 : 0) != 0);
            FieldList reqPFields1 = coGroup.getKeysForInput1();
            FieldList reqPFields2 = coGroup.getKeysForInput2();
            for (int i = 0; i < pFields1.size(); ++i) {
                int f1 = pFields1.get(i);
                int f2 = pFields2.get(i);
                int pos1 = this.getPosInFieldList(f1, reqPFields1);
                int pos2 = this.getPosInFieldList(f2, reqPFields2);
                if (pos1 < 0) {
                    Assert.fail((String)("Input 1 is partitioned on field " + f1 + " which is not contained in the key set " + reqPFields1));
                }
                if (pos2 < 0) {
                    Assert.fail((String)("Input 2 is partitioned on field " + f2 + " which is not contained in the key set " + reqPFields2));
                }
                if (pos1 == pos2) continue;
                Assert.fail((String)"Inputs are not partitioned on the same key fields");
            }
        } else {
            throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
        }
    }

    private int getPosInFieldList(int field, FieldList list) {
        int pos;
        for (pos = 0; pos < list.size() && field != list.get(pos); ++pos) {
        }
        if (pos == list.size()) {
            return -1;
        }
        return pos;
    }

    public static class MockCoGroup
    implements CoGroupFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
        }
    }

    public static class MockJoin
    implements JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
            return null;
        }
    }

    public static class MockMapper
    implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
            return null;
        }
    }
}

