/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class IterationsCompilerTest
extends CompilerTestBase {
    @Test
    public void testSolutionSetDeltaDependsOnBroadcastVariable() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            MapOperator source = env.generateSequence(1L, 1000L).map(new DuplicateValueScalar());
            MapOperator invariantInput = env.generateSequence(1L, 1000L).map(new DuplicateValueScalar());
            DeltaIteration iter = source.iterateDelta((DataSet)source, 1000, new int[]{1});
            JoinOperator.ProjectJoin result = ((MapOperator)invariantInput.map(new IdentityMapper()).withBroadcastSet((DataSet)iter.getWorkset(), "bc data")).join((DataSet)iter.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{1}).projectFirst(new int[]{1}).projectSecond(new int[]{1});
            iter.closeWith((DataSet)result.map(new IdentityMapper()), (DataSet)result).output((OutputFormat)new DiscardingOutputFormat());
            OptimizedPlan p = this.compileNoStats(env.createProgramPlan());
            new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
            new JobGraphGenerator().compileJobGraph(p);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTwoIterationsWithMapperInbetween() throws Exception {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource verticesWithInitialId = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSource edges = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSet<Tuple2<Long, Long>> bulkResult = IterationsCompilerTest.doBulkIteration((DataSet<Tuple2<Long, Long>>)verticesWithInitialId, (DataSet<Tuple2<Long, Long>>)edges);
            MapOperator mappedBulk = bulkResult.map((MapFunction)new DummyMap());
            DataSet<Tuple2<Long, Long>> depResult = IterationsCompilerTest.doDeltaIteration((DataSet<Tuple2<Long, Long>>)mappedBulk, (DataSet<Tuple2<Long, Long>>)edges);
            depResult.output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            Assert.assertEquals((long)1L, (long)op.getDataSinks().size());
            Assert.assertTrue((boolean)(((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource() instanceof WorksetIterationPlanNode));
            WorksetIterationPlanNode wipn = (WorksetIterationPlanNode)((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)wipn.getInput1().getShipStrategy());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput1().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput2().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput1().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput2().getDataExchangeMode());
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTwoIterationsDirectlyChained() throws Exception {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource verticesWithInitialId = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSource edges = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSet<Tuple2<Long, Long>> bulkResult = IterationsCompilerTest.doBulkIteration((DataSet<Tuple2<Long, Long>>)verticesWithInitialId, (DataSet<Tuple2<Long, Long>>)edges);
            DataSet<Tuple2<Long, Long>> depResult = IterationsCompilerTest.doDeltaIteration(bulkResult, (DataSet<Tuple2<Long, Long>>)edges);
            depResult.output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            Assert.assertEquals((long)1L, (long)op.getDataSinks().size());
            Assert.assertTrue((boolean)(((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource() instanceof WorksetIterationPlanNode));
            WorksetIterationPlanNode wipn = (WorksetIterationPlanNode)((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource();
            BulkIterationPlanNode bipn = (BulkIterationPlanNode)wipn.getInput1().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)wipn.getInput1().getShipStrategy());
            for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
                Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)c.getShipStrategy());
            }
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput1().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput2().getDataExchangeMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput1().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput2().getTempMode());
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTwoWorksetIterationsDirectlyChained() throws Exception {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource verticesWithInitialId = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSource edges = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            DataSet<Tuple2<Long, Long>> firstResult = IterationsCompilerTest.doDeltaIteration((DataSet<Tuple2<Long, Long>>)verticesWithInitialId, (DataSet<Tuple2<Long, Long>>)edges);
            DataSet<Tuple2<Long, Long>> secondResult = IterationsCompilerTest.doDeltaIteration(firstResult, (DataSet<Tuple2<Long, Long>>)edges);
            secondResult.output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            Assert.assertEquals((long)1L, (long)op.getDataSinks().size());
            Assert.assertTrue((boolean)(((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource() instanceof WorksetIterationPlanNode));
            WorksetIterationPlanNode wipn = (WorksetIterationPlanNode)((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)wipn.getInput1().getShipStrategy());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput1().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)wipn.getInput2().getDataExchangeMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput1().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)wipn.getInput2().getTempMode());
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIterationPushingWorkOut() throws Exception {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            MapOperator input1 = env.readCsvFile("/some/file/path").types(Long.class).map((MapFunction)new DuplicateValue());
            DataSource input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
            IterationsCompilerTest.doBulkIteration((DataSet<Tuple2<Long, Long>>)input1, (DataSet<Tuple2<Long, Long>>)input2).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            Assert.assertEquals((long)1L, (long)op.getDataSinks().size());
            Assert.assertTrue((boolean)(((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource() instanceof BulkIterationPlanNode));
            BulkIterationPlanNode bipn = (BulkIterationPlanNode)((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource();
            for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
                Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)c.getShipStrategy());
            }
            for (Channel c : bipn.getRootOfStepFunction().getInputs()) {
                Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)c.getShipStrategy());
            }
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)bipn.getInput().getShipStrategy());
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIterationNotPushingWorkOut() throws Exception {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            MapOperator input1 = env.readCsvFile("/some/file/path").types(Long.class).map((MapFunction)new DuplicateValue());
            DataSource input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
            IterationsCompilerTest.doSimpleBulkIteration((DataSet<Tuple2<Long, Long>>)input1, (DataSet<Tuple2<Long, Long>>)input2).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            Assert.assertEquals((long)1L, (long)op.getDataSinks().size());
            Assert.assertTrue((boolean)(((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource() instanceof BulkIterationPlanNode));
            BulkIterationPlanNode bipn = (BulkIterationPlanNode)((SinkPlanNode)op.getDataSinks().iterator().next()).getInput().getSource();
            for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
                Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)c.getShipStrategy());
            }
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)bipn.getInput().getShipStrategy());
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWorksetIterationPipelineBreakerPlacement() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            MapOperator initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map((MapFunction)new DuplicateValue());
            MapOperator initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map((MapFunction)new DuplicateValue());
            DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialWorkset, 100, new int[]{0});
            MapOperator next = iteration.getWorkset().map(new IdentityMapper());
            DataSet result = iteration.closeWith((DataSet)next, (DataSet)next);
            initialWorkset.join(result, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{0}).output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            this.compileNoStats(p);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testResetPartialSolution() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource width = env.generateSequence(1L, 10L);
            DataSource update = env.generateSequence(1L, 10L);
            DataSource lastGradient = env.generateSequence(1L, 10L);
            UnionOperator init = width.union((DataSet)update).union((DataSet)lastGradient);
            IterativeDataSet iteration = init.iterate(10);
            width = iteration.filter(new IdFilter());
            update = iteration.filter(new IdFilter());
            lastGradient = iteration.filter(new IdFilter());
            MapOperator gradient = width.map(new IdentityMapper());
            JoinOperator.EquiJoin term = gradient.join((DataSet)lastGradient).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).with((JoinFunction)new JoinFunction<Long, Long, Long>(){

                public Long join(Long first, Long second) {
                    return null;
                }
            });
            update = update.map((MapFunction)new RichMapFunction<Long, Long>(){

                public Long map(Long value) {
                    return null;
                }
            }).withBroadcastSet((DataSet)term, "some-name");
            DataSet result = iteration.closeWith((DataSet)width.union((DataSet)update).union((DataSet)lastGradient));
            result.output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            new JobGraphGenerator().compileJobGraph(op);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBulkIterationWithPartialSolutionProperties() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator input1 = env.generateSequence(1L, 10L).map((MapFunction)new MapFunction<Long, Tuple1<Long>>(){

            public Tuple1<Long> map(Long value) throws Exception {
                return new Tuple1((Object)value);
            }
        });
        MapOperator input2 = env.generateSequence(1L, 10L).map((MapFunction)new MapFunction<Long, Tuple1<Long>>(){

            public Tuple1<Long> map(Long value) throws Exception {
                return new Tuple1((Object)value);
            }
        });
        DistinctOperator distinctInput = input1.distinct();
        IterativeDataSet iteration = distinctInput.iterate(10);
        CoGroupOperator iterationStep = iteration.coGroup((DataSet)input2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new CoGroupFunction<Tuple1<Long>, Tuple1<Long>, Tuple1<Long>>(){

            public void coGroup(Iterable<Tuple1<Long>> first, Iterable<Tuple1<Long>> second, Collector<Tuple1<Long>> out) throws Exception {
                Iterator<Tuple1<Long>> it = first.iterator();
                if (it.hasNext()) {
                    out.collect(it.next());
                }
            }
        });
        DataSet iterationResult = iteration.closeWith((DataSet)iterationStep);
        iterationResult.output((OutputFormat)new DiscardingOutputFormat());
        Plan p = env.createProgramPlan();
        OptimizedPlan op = this.compileNoStats(p);
        new JobGraphGenerator().compileJobGraph(op);
    }

    public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
        IterativeDataSet iteration = vertices.iterate(20);
        FlatMapOperator changes = iteration.join(edges).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new Join222()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1).join((DataSet)iteration).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new FlatMapJoin());
        return iteration.closeWith((DataSet)changes);
    }

    public static DataSet<Tuple2<Long, Long>> doSimpleBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
        IterativeDataSet iteration = vertices.iterate(20);
        FlatMapOperator changes = iteration.join(edges).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new FlatMapJoin());
        return iteration.closeWith((DataSet)changes);
    }

    public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
        DeltaIteration depIteration = vertices.iterateDelta(vertices, 100, new int[]{0});
        JoinOperator.ProjectJoin candidates = depIteration.getWorkset().join(edges).where(new int[]{0}).equalTo(new int[]{0}).projectSecond(new int[]{1});
        GroupReduceOperator grouped = candidates.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new Reduce101());
        JoinOperator.ProjectJoin candidatesDependencies = grouped.join(edges).where(new int[]{0}).equalTo(new int[]{1}).projectSecond(new int[]{0, 1});
        AggregateOperator verticesWithNewComponents = candidatesDependencies.join((DataSet)depIteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new Join222()).groupBy(new int[]{0}).aggregate(Aggregations.MIN, 1);
        FlatMapOperator updatedComponentId = verticesWithNewComponents.join((DataSet)depIteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new FlatMapJoin());
        DataSet depResult = depIteration.closeWith((DataSet)updatedComponentId, (DataSet)updatedComponentId);
        return depResult;
    }

    public static final class IdFilter<T>
    implements FilterFunction<T> {
        public boolean filter(T value) {
            return true;
        }
    }

    public static final class DuplicateValueScalar<T>
    extends RichMapFunction<T, Tuple2<T, T>> {
        public Tuple2<T, T> map(T value) {
            return new Tuple2(value, value);
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    public static final class DuplicateValue
    extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
        public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
            return new Tuple2(value.f0, value.f0);
        }
    }

    @FunctionAnnotation.ForwardedFields(value={"0"})
    public static final class Reduce101
    extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
        public void reduce(Iterable<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {
        }
    }

    public static final class DummyMap
    extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
            return value;
        }
    }

    public static final class FlatMapJoin
    extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
        public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {
        }
    }

    public static final class Join222
    extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
            return null;
        }
    }
}

