package org.apache.flink.optimizer;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/PartitioningReusageTest.class */
public class PartitioningReusageTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/PartitioningReusageTest$MockCoGroup.class */
    public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> iterable, Iterable<Tuple3<Integer, Integer, Integer>> iterable2, Collector<Tuple3<Integer, Integer, Integer>> collector) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/PartitioningReusageTest$MockJoin.class */
    public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> tuple3, Tuple3<Integer, Integer, Integer> tuple32) throws Exception {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/PartitioningReusageTest$MockMapper.class */
    public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> tuple3) throws Exception {
            return null;
        }
    }

    @Test
    public void noPreviousPartitioningJoin1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void noPreviousPartitioningJoin2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningJoin1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningJoin2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningJoin3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2, 1}).map(new MockMapper()).withForwardedFields(new String[]{"2;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningJoin4() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0}).map(new MockMapper()).withForwardedFields(new String[]{"0"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningJoin5() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1, 2}).map(new MockMapper()).withForwardedFields(new String[]{"1;2"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0}).map(new MockMapper()).withForwardedFields(new String[]{"0"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2, 1}).map(new MockMapper()).withForwardedFields(new String[]{"2;1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin4() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 2}).map(new MockMapper()).withForwardedFields(new String[]{"0;2"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin5() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin6() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0}).map(new MockMapper()).withForwardedFields(new String[]{"0"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningJoin7() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with(new MockJoin()).output(new DiscardingOutputFormat());
        checkValidJoinInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void noPreviousPartitioningCoGroup1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class)).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void noPreviousPartitioningCoGroup2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class)).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningCoGroup1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class)).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningCoGroup2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class)).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningCoGroup3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2, 1}).map(new MockMapper()).withForwardedFields(new String[]{"2;1"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningCoGroup4() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0}).map(new MockMapper()).withForwardedFields(new String[]{"0"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class)).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseSinglePartitioningCoGroup5() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup1() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"})).where(new int[]{0, 1}).equalTo(new int[]{0, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup2() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 1}).map(new MockMapper()).withForwardedFields(new String[]{"0;1"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1, 2}).map(new MockMapper()).withForwardedFields(new String[]{"1;2"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup3() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0}).map(new MockMapper()).withForwardedFields(new String[]{"0"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2, 1}).map(new MockMapper()).withForwardedFields(new String[]{"2;1"})).where(new int[]{0, 1}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup4() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{0, 2}).map(new MockMapper()).withForwardedFields(new String[]{"0;2"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup5() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{2, 1}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup6() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"})).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    @Test
    public void reuseBothPartitioningCoGroup7() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{2}).map(new MockMapper()).withForwardedFields(new String[]{"2"}).coGroup(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).partitionByHash(new int[]{1}).map(new MockMapper()).withForwardedFields(new String[]{"1"})).where(new int[]{0, 2}).equalTo(new int[]{1, 2}).with(new MockCoGroup()).output(new DiscardingOutputFormat());
        checkValidCoGroupInputProperties((DualInputPlanNode) ((SinkPlanNode) compileWithStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource());
    }

    private void checkValidJoinInputProperties(DualInputPlanNode dualInputPlanNode) {
        GlobalProperties globalProperties = dualInputPlanNode.getInput1().getGlobalProperties();
        GlobalProperties globalProperties2 = dualInputPlanNode.getInput2().getGlobalProperties();
        if (globalProperties.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || globalProperties2.getPartitioning() != PartitioningProperty.HASH_PARTITIONED) {
            if (globalProperties.getPartitioning() == PartitioningProperty.FULL_REPLICATION && globalProperties2.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
                return;
            }
            if (globalProperties.getPartitioning() != PartitioningProperty.RANDOM_PARTITIONED || globalProperties2.getPartitioning() != PartitioningProperty.FULL_REPLICATION) {
                throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
            }
            return;
        }
        FieldList partitioningFields = globalProperties.getPartitioningFields();
        FieldList partitioningFields2 = globalProperties2.getPartitioningFields();
        Assert.assertTrue("Inputs are not the same number of fields. Input 1: " + partitioningFields + ", Input 2: " + partitioningFields2, partitioningFields.size() == partitioningFields2.size());
        FieldList keysForInput1 = dualInputPlanNode.getKeysForInput1();
        FieldList keysForInput2 = dualInputPlanNode.getKeysForInput2();
        for (int i = 0; i < partitioningFields.size(); i++) {
            int intValue = partitioningFields.get(i).intValue();
            int intValue2 = partitioningFields2.get(i).intValue();
            int posInFieldList = getPosInFieldList(intValue, keysForInput1);
            int posInFieldList2 = getPosInFieldList(intValue2, keysForInput2);
            if (posInFieldList < 0) {
                Assert.fail("Input 1 is partitioned on field " + intValue + " which is not contained in the key set " + keysForInput1);
            }
            if (posInFieldList2 < 0) {
                Assert.fail("Input 2 is partitioned on field " + intValue2 + " which is not contained in the key set " + keysForInput2);
            }
            if (posInFieldList != posInFieldList2) {
                Assert.fail("Inputs are not partitioned on the same key fields");
            }
        }
    }

    private void checkValidCoGroupInputProperties(DualInputPlanNode dualInputPlanNode) {
        GlobalProperties globalProperties = dualInputPlanNode.getInput1().getGlobalProperties();
        GlobalProperties globalProperties2 = dualInputPlanNode.getInput2().getGlobalProperties();
        if (globalProperties.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || globalProperties2.getPartitioning() != PartitioningProperty.HASH_PARTITIONED) {
            throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
        }
        FieldList partitioningFields = globalProperties.getPartitioningFields();
        FieldList partitioningFields2 = globalProperties2.getPartitioningFields();
        Assert.assertTrue("Inputs are not the same number of fields. Input 1: " + partitioningFields + ", Input 2: " + partitioningFields2, partitioningFields.size() == partitioningFields2.size());
        FieldList keysForInput1 = dualInputPlanNode.getKeysForInput1();
        FieldList keysForInput2 = dualInputPlanNode.getKeysForInput2();
        for (int i = 0; i < partitioningFields.size(); i++) {
            int intValue = partitioningFields.get(i).intValue();
            int intValue2 = partitioningFields2.get(i).intValue();
            int posInFieldList = getPosInFieldList(intValue, keysForInput1);
            int posInFieldList2 = getPosInFieldList(intValue2, keysForInput2);
            if (posInFieldList < 0) {
                Assert.fail("Input 1 is partitioned on field " + intValue + " which is not contained in the key set " + keysForInput1);
            }
            if (posInFieldList2 < 0) {
                Assert.fail("Input 2 is partitioned on field " + intValue2 + " which is not contained in the key set " + keysForInput2);
            }
            if (posInFieldList != posInFieldList2) {
                Assert.fail("Inputs are not partitioned on the same key fields");
            }
        }
    }

    private int getPosInFieldList(int i, FieldList fieldList) {
        int i2 = 0;
        while (i2 < fieldList.size() && i != fieldList.get(i2).intValue()) {
            i2++;
        }
        if (i2 == fieldList.size()) {
            return -1;
        }
        return i2;
    }
}
