/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

public class CoGroupSolutionSetFirstTest
extends CompilerTestBase {
    @Test
    public void testCoGroupSolutionSet() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource raw = env.readCsvFile(IN_FILE).types(Integer.class);
        DeltaIteration iteration = raw.iterateDelta((DataSet)raw, 1000, new int[]{0});
        MapOperator test = iteration.getWorkset().map((MapFunction)new SimpleMap());
        CoGroupOperator delta = iteration.getSolutionSet().coGroup((DataSet)test).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new SimpleCGroup());
        MapOperator feedback = iteration.getWorkset().map((MapFunction)new SimpleMap());
        DataSet result = iteration.closeWith((DataSet)delta, (DataSet)feedback);
        result.output((OutputFormat)new DiscardingOutputFormat());
        Plan plan = env.createProgramPlan();
        OptimizedPlan oPlan = null;
        try {
            oPlan = this.compileNoStats(plan);
        }
        catch (CompilerException e) {
            Assert.fail((String)e.getMessage());
        }
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                if (visitable instanceof WorksetIterationPlanNode) {
                    PlanNode deltaNode = ((WorksetIterationPlanNode)visitable).getSolutionSetDeltaPlanNode();
                    DualInputPlanNode dpn = (DualInputPlanNode)((Channel)deltaNode.getInputs().iterator().next()).getSource();
                    Channel in1 = dpn.getInput1();
                    Channel in2 = dpn.getInput2();
                    Assert.assertTrue((in1.getLocalProperties().getOrdering() == null ? 1 : 0) != 0);
                    Assert.assertTrue((in2.getLocalProperties().getOrdering() != null ? 1 : 0) != 0);
                    Assert.assertTrue((boolean)in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(Integer.valueOf(0)));
                    Assert.assertTrue((in1.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                    Assert.assertTrue((in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH ? 1 : 0) != 0);
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }

    public static class SimpleMap
    extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
        public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
            return null;
        }
    }

    public static class SimpleCGroup
    extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
        public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {
        }
    }
}

