/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.java;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class DistinctAndGroupingOptimizerTest
extends CompilerTestBase {
    @Test
    public void testDistinctPreservesPartitioningOfDistinctFields() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            Operator data = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L), new Tuple2((Object)1L, (Object)1L)}).map(new IdentityMapper()).setParallelism(4);
            data.distinct(new int[]{0}).groupBy(new int[]{0}).sum(1).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode reducer = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode distinctReducer = (SingleInputPlanNode)reducer.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)distinctReducer.getInput().getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDistinctDestroysPartitioningOfNonDistinctFields() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            Operator data = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L), new Tuple2((Object)1L, (Object)1L)}).map(new IdentityMapper()).setParallelism(4);
            data.distinct(new int[]{1}).groupBy(new int[]{0}).sum(1).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode reducer = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode combiner = (SingleInputPlanNode)reducer.getInput().getSource();
            SingleInputPlanNode distinctReducer = (SingleInputPlanNode)combiner.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)combiner.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)distinctReducer.getInput().getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

