/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.java;

import java.io.Serializable;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.junit.Assert;
import org.junit.Test;

public class ReduceCompilationTest
extends CompilerTestBase
implements Serializable {
    @Test
    public void testAllReduceNoCombiner() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = env.fromElements((Object[])new Double[]{0.2, 0.3, 0.4, 0.5}).name("source");
            ((ReduceOperator)data.reduce((ReduceFunction)new RichReduceFunction<Double>(){

                public Double reduce(Double value1, Double value2) {
                    return value1 + value2;
                }
            }).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            Assert.assertEquals((Object)sourceNode, (Object)reduceNode.getInput().getSource());
            Assert.assertEquals((Object)reduceNode, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((long)1L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)1L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)1L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testAllReduceWithCombiner() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = env.generateSequence(1L, 8000000L).name("source");
            ((ReduceOperator)data.reduce((ReduceFunction)new RichReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)combineNode.getInput().getSource());
            Assert.assertEquals((Object)reduceNode, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.ALL_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.ALL_REDUCE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((long)8L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)1L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)1L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testGroupedReduceWithFieldPositionKey() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            ((ReduceOperator)data.groupBy(new int[]{1}).reduce((ReduceFunction)new RichReduceFunction<Tuple2<String, Double>>(){

                public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
                    return null;
                }
            }).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)combineNode.getInput().getSource());
            Assert.assertEquals((Object)reduceNode, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.SORTED_PARTIAL_REDUCE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(1), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(1), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(1), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testGroupedReduceWithSelectorFunctionKey() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            ((ReduceOperator)data.groupBy((KeySelector)new KeySelector<Tuple2<String, Double>, String>(){

                public String getKey(Tuple2<String, Double> value) {
                    return (String)value.f0;
                }
            }).reduce((ReduceFunction)new RichReduceFunction<Tuple2<String, Double>>(){

                public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
                    return null;
                }
            }).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            SingleInputPlanNode keyExtractor = (SingleInputPlanNode)combineNode.getInput().getSource();
            SingleInputPlanNode keyProjector = (SingleInputPlanNode)sinkNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)keyExtractor.getInput().getSource());
            Assert.assertEquals((Object)keyProjector, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.SORTED_PARTIAL_REDUCE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)keyExtractor.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)keyProjector.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testGroupedReduceWithHint() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            ((ReduceOperator)data.groupBy((KeySelector)new KeySelector<Tuple2<String, Double>, String>(){

                public String getKey(Tuple2<String, Double> value) {
                    return (String)value.f0;
                }
            }).reduce((ReduceFunction)new RichReduceFunction<Tuple2<String, Double>>(){

                public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
                    return null;
                }
            }).setCombineHint(ReduceOperatorBase.CombineHint.HASH).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            SingleInputPlanNode keyExtractor = (SingleInputPlanNode)combineNode.getInput().getSource();
            SingleInputPlanNode keyProjector = (SingleInputPlanNode)sinkNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)keyExtractor.getInput().getSource());
            Assert.assertEquals((Object)keyProjector, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.HASHED_PARTIAL_REDUCE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)keyExtractor.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)keyProjector.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testGroupedReduceWithoutCombiner() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
        ((ReduceOperator)data.groupBy(new int[]{0}).reduce((ReduceFunction)new RichReduceFunction<Tuple2<String, Double>>(){

            public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
                return null;
            }
        }).setCombineHint(ReduceOperatorBase.CombineHint.NONE).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
        Plan p = env.createProgramPlan();
        OptimizedPlan op = this.compileNoStats(p);
        CompilerTestBase.OptimizerPlanNodeResolver resolver = ReduceCompilationTest.getOptimizerPlanNodeResolver(op);
        SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
        SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
        SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
        Assert.assertEquals((Object)sourceNode, (Object)reduceNode.getInput().getSource());
        Assert.assertEquals((Object)DriverStrategy.SORTED_REDUCE, (Object)reduceNode.getDriverStrategy());
        Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getKeys(0));
        Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getInput().getLocalStrategyKeys());
        Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
        Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
        Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
    }
}

