/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.dataexchange;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.junit.Assert;
import org.junit.Test;

public class DataExchangeModeForwardTest
extends CompilerTestBase {
    @Test
    public void testPipelinedForced() {
        this.verifySimpleForwardPlan(ExecutionMode.PIPELINED_FORCED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testPipelined() {
        this.verifySimpleForwardPlan(ExecutionMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatch() {
        this.verifySimpleForwardPlan(ExecutionMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED);
    }

    @Test
    public void testBatchForced() {
        this.verifySimpleForwardPlan(ExecutionMode.BATCH_FORCED, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.BATCH, DataExchangeMode.PIPELINED, DataExchangeMode.BATCH, DataExchangeMode.BATCH);
    }

    private void verifySimpleForwardPlan(ExecutionMode execMode, DataExchangeMode toMap, DataExchangeMode toFilter, DataExchangeMode toKeyExtractor, DataExchangeMode toCombiner, DataExchangeMode toReduce, DataExchangeMode toSink) {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setExecutionMode(execMode);
            DataSource dataSet = env.readTextFile("/never/accessed");
            dataSet.map((MapFunction)new MapFunction<String, Integer>(){

                public Integer map(String value) {
                    return 0;
                }
            }).filter((FilterFunction)new FilterFunction<Integer>(){

                public boolean filter(Integer value) {
                    return false;
                }
            }).groupBy(new IdentityKeyExtractor()).reduceGroup(new Top1GroupReducer()).output((OutputFormat)new DiscardingOutputFormat());
            OptimizedPlan optPlan = this.compileNoStats(env.createProgramPlan());
            SinkPlanNode sinkNode = (SinkPlanNode)optPlan.getDataSinks().iterator().next();
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)sinkNode.getPredecessor();
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getPredecessor();
            SingleInputPlanNode keyExtractorNode = (SingleInputPlanNode)combineNode.getPredecessor();
            SingleInputPlanNode filterNode = (SingleInputPlanNode)keyExtractorNode.getPredecessor();
            SingleInputPlanNode mapNode = (SingleInputPlanNode)filterNode.getPredecessor();
            Assert.assertEquals((Object)toMap, (Object)mapNode.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)toFilter, (Object)filterNode.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)toKeyExtractor, (Object)keyExtractorNode.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)toCombiner, (Object)combineNode.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)toReduce, (Object)reduceNode.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)toSink, (Object)sinkNode.getInput().getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

