/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

public class AdditionalOperatorsTest
extends CompilerTestBase {
    @Test
    public void testCrossWithSmall() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource set1 = env.generateSequence(0L, 1L);
        DataSource set2 = env.generateSequence(0L, 1L);
        ((CrossOperator)set1.crossWithTiny((DataSet)set2).name("Cross")).output((OutputFormat)new DiscardingOutputFormat());
        try {
            Plan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileWithStats(plan);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = new CompilerTestBase.OptimizerPlanNodeResolver(oPlan);
            DualInputPlanNode crossPlanNode = (DualInputPlanNode)resolver.getNode("Cross");
            Channel in1 = crossPlanNode.getInput1();
            Channel in2 = crossPlanNode.getInput2();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)in1.getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.BROADCAST, (Object)in2.getShipStrategy());
        }
        catch (CompilerException ce) {
            ce.printStackTrace();
            Assert.fail((String)"The Flink optimizer is unable to compile this plan correctly.");
        }
    }

    @Test
    public void testCrossWithLarge() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource set1 = env.generateSequence(0L, 1L);
        DataSource set2 = env.generateSequence(0L, 1L);
        ((CrossOperator)set1.crossWithHuge((DataSet)set2).name("Cross")).output((OutputFormat)new DiscardingOutputFormat());
        try {
            Plan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats(plan);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = new CompilerTestBase.OptimizerPlanNodeResolver(oPlan);
            DualInputPlanNode crossPlanNode = (DualInputPlanNode)resolver.getNode("Cross");
            Channel in1 = crossPlanNode.getInput1();
            Channel in2 = crossPlanNode.getInput2();
            Assert.assertEquals((Object)ShipStrategyType.BROADCAST, (Object)in1.getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)in2.getShipStrategy());
        }
        catch (CompilerException ce) {
            ce.printStackTrace();
            Assert.fail((String)"The pact compiler is unable to compile this plan correctly.");
        }
    }
}

