/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.java;

import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class PartitionOperatorTest
extends CompilerTestBase {
    @Test
    public void testPartitionCustomOperatorPreservesFields() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource data = env.fromCollection(Collections.singleton(new Tuple2((Object)0L, (Object)0L)));
            data.partitionCustom((Partitioner)new Partitioner<Long>(){

                public int partition(Long key, int numPartitions) {
                    return key.intValue();
                }
            }, 1).groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode reducer = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode partitioner = (SingleInputPlanNode)reducer.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)partitioner.getInput().getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRangePartitionOperatorPreservesFields() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource data = env.fromCollection(Collections.singleton(new Tuple2((Object)0L, (Object)0L)));
            data.partitionByRange(new int[]{1}).groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode reducer = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
            SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode)partitionNode.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)partitionNode.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)partitionIDRemover.getInput().getShipStrategy());
            SourcePlanNode sourcePlanNode = (SourcePlanNode)op.getDataSources().iterator().next();
            List sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
            Assert.assertEquals((long)2L, (long)sourceOutgoingChannels.size());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)sourceOutgoingChannels.get(0)).getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)sourceOutgoingChannels.get(1)).getShipStrategy());
            Assert.assertEquals((Object)DataExchangeMode.PIPELINED, (Object)((Channel)sourceOutgoingChannels.get(0)).getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)((Channel)sourceOutgoingChannels.get(1)).getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRangePartitionOperatorPreservesFields2() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource data = env.fromCollection(Collections.singleton(new Tuple2((Object)0L, (Object)0L)));
            PartitionOperator rangePartitioned = data.partitionByRange(new int[]{1});
            rangePartitioned.groupBy(new int[]{1}).reduceGroup(new IdentityGroupReducerCombinable()).output((OutputFormat)new DiscardingOutputFormat());
            data.groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1).map((MapFunction)new MapFunction<Tuple2<Long, Long>, Long>(){

                public Long map(Tuple2<Long, Long> value) throws Exception {
                    return (Long)value.f1;
                }
            }).output((OutputFormat)new DiscardingOutputFormat());
            rangePartitioned.filter((FilterFunction)new FilterFunction<Tuple2<Long, Long>>(){

                public boolean filter(Tuple2<Long, Long> value) throws Exception {
                    return (Long)value.f0 % 2L == 0L;
                }
            }).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode reducer = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
            SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode)partitionNode.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)partitionNode.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_CUSTOM, (Object)partitionIDRemover.getInput().getShipStrategy());
            SourcePlanNode sourcePlanNode = (SourcePlanNode)op.getDataSources().iterator().next();
            List sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
            Assert.assertEquals((long)3L, (long)sourceOutgoingChannels.size());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)sourceOutgoingChannels.get(0)).getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)sourceOutgoingChannels.get(1)).getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)sourceOutgoingChannels.get(2)).getShipStrategy());
            Assert.assertEquals((Object)DataExchangeMode.PIPELINED, (Object)((Channel)sourceOutgoingChannels.get(0)).getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.PIPELINED, (Object)((Channel)sourceOutgoingChannels.get(1)).getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)((Channel)sourceOutgoingChannels.get(2)).getDataExchangeMode());
            List partitionOutputChannels = partitionNode.getOutgoingChannels();
            Assert.assertEquals((long)2L, (long)partitionOutputChannels.size());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)partitionOutputChannels.get(0)).getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)partitionOutputChannels.get(1)).getShipStrategy());
            Assert.assertEquals((Object)DataExchangeMode.PIPELINED, (Object)((Channel)partitionOutputChannels.get(0)).getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.PIPELINED, (Object)((Channel)partitionOutputChannels.get(1)).getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

