/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.custompartition;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class JoinCustomPartitioningTest
extends CompilerTestBase {
    @Test
    public void testJoinWithTuples() {
        try {
            TestPartitionerLong partitioner = new TestPartitionerLong();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
            DataSource input2 = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)0L, (Object)0L, (Object)0L)});
            input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{1}).equalTo(new int[]{0}).withPartitioner((Partitioner)partitioner).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput2().getShipStrategy());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput1().getPartitioner());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput2().getPartitioner());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testJoinWithTuplesWrongType() {
        try {
            TestPartitionerInt partitioner = new TestPartitionerInt();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
            DataSource input2 = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)0L, (Object)0L, (Object)0L)});
            try {
                input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{1}).equalTo(new int[]{0}).withPartitioner((Partitioner)partitioner);
                Assert.fail((String)"should throw an exception");
            }
            catch (InvalidProgramException invalidProgramException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testJoinWithPojos() {
        try {
            TestPartitionerInt partitioner = new TestPartitionerInt();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Pojo2[]{new Pojo2()});
            DataSource input2 = env.fromElements((Object[])new Pojo3[]{new Pojo3()});
            input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new String[]{"b"}).equalTo(new String[]{"a"}).withPartitioner((Partitioner)partitioner).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput2().getShipStrategy());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput1().getPartitioner());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput2().getPartitioner());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testJoinWithPojosWrongType() {
        try {
            TestPartitionerLong partitioner = new TestPartitionerLong();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Pojo2[]{new Pojo2()});
            DataSource input2 = env.fromElements((Object[])new Pojo3[]{new Pojo3()});
            try {
                input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new String[]{"a"}).equalTo(new String[]{"b"}).withPartitioner((Partitioner)partitioner);
                Assert.fail((String)"should throw an exception");
            }
            catch (InvalidProgramException invalidProgramException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testJoinWithKeySelectors() {
        try {
            TestPartitionerInt partitioner = new TestPartitionerInt();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Pojo2[]{new Pojo2()});
            DataSource input2 = env.fromElements((Object[])new Pojo3[]{new Pojo3()});
            input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where((KeySelector)new Pojo2KeySelector()).equalTo((KeySelector)new Pojo3KeySelector()).withPartitioner((Partitioner)partitioner).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput1().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)join.getInput2().getShipStrategy());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput1().getPartitioner());
            Assert.assertEquals((Object)partitioner, (Object)join.getInput2().getPartitioner());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testJoinWithKeySelectorsWrongType() {
        try {
            TestPartitionerLong partitioner = new TestPartitionerLong();
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input1 = env.fromElements((Object[])new Pojo2[]{new Pojo2()});
            DataSource input2 = env.fromElements((Object[])new Pojo3[]{new Pojo3()});
            try {
                input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where((KeySelector)new Pojo2KeySelector()).equalTo((KeySelector)new Pojo3KeySelector()).withPartitioner((Partitioner)partitioner);
                Assert.fail((String)"should throw an exception");
            }
            catch (InvalidProgramException invalidProgramException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIncompatibleHashAndCustomPartitioning() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource input = env.fromElements((Object[])new Tuple3[]{new Tuple3((Object)0L, (Object)0L, (Object)0L)});
            SingleInputUdfOperator partitioned = input.partitionCustom((Partitioner)new Partitioner<Long>(){

                public int partition(Long key, int numPartitions) {
                    return 0;
                }
            }, 0).map(new IdentityMapper()).withForwardedFields(new String[]{"0", "1", "2"});
            SingleInputUdfOperator grouped = partitioned.distinct(new int[]{0, 1}).groupBy(new int[]{1}).sortGroup(0, Order.ASCENDING).reduceGroup(new IdentityGroupReducerCombinable()).withForwardedFields(new String[]{"0", "1"});
            grouped.join((DataSet)partitioned, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction()).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            DualInputPlanNode coGroup = (DualInputPlanNode)sink.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)coGroup.getInput1().getShipStrategy());
            Assert.assertTrue((coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static class Pojo3KeySelector
    implements KeySelector<Pojo3, Integer> {
        private Pojo3KeySelector() {
        }

        public Integer getKey(Pojo3 value) {
            return value.b;
        }
    }

    private static class Pojo2KeySelector
    implements KeySelector<Pojo2, Integer> {
        private Pojo2KeySelector() {
        }

        public Integer getKey(Pojo2 value) {
            return value.a;
        }
    }

    public static class Pojo3 {
        public int a;
        public int b;
        public int c;
    }

    public static class Pojo2 {
        public int a;
        public int b;
    }

    private static class TestPartitionerLong
    implements Partitioner<Long> {
        private TestPartitionerLong() {
        }

        public int partition(Long key, int numPartitions) {
            return 0;
        }
    }

    private static class TestPartitionerInt
    implements Partitioner<Integer> {
        private TestPartitionerInt() {
        }

        public int partition(Integer key, int numPartitions) {
            return 0;
        }
    }
}

