/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.plantranslate;

import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.BlockingShuffleOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.util.AbstractID;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobGraphGeneratorTest {
    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testResourcesForChainedOperators() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        ResourceSpec resource6 = ResourceSpec.newBuilder((double)0.6, (int)600).build();
        ResourceSpec resource7 = ResourceSpec.newBuilder((double)0.7, (int)700).build();
        Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        MapFunction<Long, Long> mapFunction = new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return value;
            }
        };
        FilterFunction<Long> filterFunction = new FilterFunction<Long>(){

            public boolean filter(Long value) throws Exception {
                return false;
            }
        };
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource input = env.fromElements((Object[])new Long[]{1L, 2L, 3L});
        opMethod.invoke((Object)input, resource1);
        MapOperator map1 = input.map((MapFunction)mapFunction);
        opMethod.invoke((Object)map1, resource2);
        FilterOperator filter1 = map1.filter((FilterFunction)filterFunction);
        opMethod.invoke((Object)filter1, resource3);
        IterativeDataSet startOfIteration = filter1.iterate(10);
        opMethod.invoke((Object)startOfIteration, resource4);
        MapOperator map2 = startOfIteration.map((MapFunction)mapFunction);
        opMethod.invoke((Object)map2, resource5);
        FilterOperator feedback = map2.filter((FilterFunction)filterFunction);
        opMethod.invoke((Object)feedback, resource6);
        DataSink sink = startOfIteration.closeWith((DataSet)feedback).output((OutputFormat)new DiscardingOutputFormat());
        sinkMethod.invoke((Object)sink, resource7);
        JobGraph jobGraph = JobGraphGeneratorTest.compileJob(env);
        JobVertex sourceMapFilterVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex iterationHeadVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex feedbackVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
        JobVertex sinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(3);
        JobVertex iterationSyncVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(4);
        Assert.assertTrue((boolean)sourceMapFilterVertex.getMinResources().equals((Object)resource1.merge(resource2).merge(resource3)));
        Assert.assertTrue((boolean)iterationHeadVertex.getPreferredResources().equals((Object)resource4));
        Assert.assertTrue((boolean)feedbackVertex.getMinResources().equals((Object)resource5.merge(resource6)));
        Assert.assertTrue((boolean)sinkVertex.getPreferredResources().equals((Object)resource7));
        Assert.assertTrue((boolean)iterationSyncVertex.getMinResources().equals((Object)resource4));
    }

    @Test
    public void testResourcesForDeltaIteration() throws Exception {
        ResourceSpec resource1 = ResourceSpec.newBuilder((double)0.1, (int)100).build();
        ResourceSpec resource2 = ResourceSpec.newBuilder((double)0.2, (int)200).build();
        ResourceSpec resource3 = ResourceSpec.newBuilder((double)0.3, (int)300).build();
        ResourceSpec resource4 = ResourceSpec.newBuilder((double)0.4, (int)400).build();
        ResourceSpec resource5 = ResourceSpec.newBuilder((double)0.5, (int)500).build();
        ResourceSpec resource6 = ResourceSpec.newBuilder((double)0.6, (int)600).build();
        Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
        opMethod.setAccessible(true);
        Method deltaMethod = DeltaIteration.class.getDeclaredMethod("setResources", ResourceSpec.class);
        deltaMethod.setAccessible(true);
        Method sinkMethod = DataSink.class.getDeclaredMethod("setResources", ResourceSpec.class);
        sinkMethod.setAccessible(true);
        MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> mapFunction = new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
                return value;
            }
        };
        FilterFunction<Tuple2<Long, Long>> filterFunction = new FilterFunction<Tuple2<Long, Long>>(){

            public boolean filter(Tuple2<Long, Long> value) throws Exception {
                return false;
            }
        };
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
        opMethod.invoke((Object)input, resource1);
        MapOperator map = input.map((MapFunction)mapFunction);
        opMethod.invoke((Object)map, resource2);
        DeltaIteration iteration = map.iterateDelta((DataSet)map, 100, new int[]{0}).registerAggregator("test", (Aggregator)new LongSumAggregator());
        deltaMethod.invoke((Object)iteration, resource3);
        MapOperator delta = iteration.getWorkset().map((MapFunction)mapFunction);
        opMethod.invoke((Object)delta, resource4);
        FilterOperator feedback = delta.filter((FilterFunction)filterFunction);
        opMethod.invoke((Object)feedback, resource5);
        DataSink sink = iteration.closeWith((DataSet)delta, (DataSet)feedback).output((OutputFormat)new DiscardingOutputFormat());
        sinkMethod.invoke((Object)sink, resource6);
        JobGraph jobGraph = JobGraphGeneratorTest.compileJob(env);
        JobVertex sourceMapVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex iterationHeadVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex deltaVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(2);
        JobVertex iterationTailVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(3);
        JobVertex feedbackVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(4);
        JobVertex sinkVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(5);
        JobVertex iterationSyncVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(6);
        Assert.assertTrue((boolean)sourceMapVertex.getMinResources().equals((Object)resource1.merge(resource2)));
        Assert.assertTrue((boolean)iterationHeadVertex.getPreferredResources().equals((Object)resource3));
        Assert.assertTrue((boolean)deltaVertex.getMinResources().equals((Object)resource4));
        Assert.assertTrue((boolean)iterationTailVertex.getPreferredResources().equals((Object)ResourceSpec.DEFAULT));
        Assert.assertTrue((boolean)feedbackVertex.getMinResources().equals((Object)resource5));
        Assert.assertTrue((boolean)sinkVertex.getPreferredResources().equals((Object)resource6));
        Assert.assertTrue((boolean)iterationSyncVertex.getMinResources().equals((Object)resource3));
    }

    @Test
    public void testArtifactCompression() throws IOException {
        java.nio.file.Path plainFile1 = this.tmp.newFile("plainFile1").toPath();
        java.nio.file.Path plainFile2 = this.tmp.newFile("plainFile2").toPath();
        java.nio.file.Path directory1 = this.tmp.newFolder("directory1").toPath();
        Files.createDirectory(directory1.resolve("containedFile1"), new FileAttribute[0]);
        java.nio.file.Path directory2 = this.tmp.newFolder("directory2").toPath();
        Files.createDirectory(directory2.resolve("containedFile2"), new FileAttribute[0]);
        String executableFileName = "executableFile";
        String nonExecutableFileName = "nonExecutableFile";
        String executableDirName = "executableDir";
        String nonExecutableDirName = "nonExecutableDIr";
        HashMap<String, DistributedCache.DistributedCacheEntry> originalArtifacts = new HashMap<String, DistributedCache.DistributedCacheEntry>();
        originalArtifacts.put("executableFile", new DistributedCache.DistributedCacheEntry(plainFile1.toString(), Boolean.valueOf(true)));
        originalArtifacts.put("nonExecutableFile", new DistributedCache.DistributedCacheEntry(plainFile2.toString(), Boolean.valueOf(false)));
        originalArtifacts.put("executableDir", new DistributedCache.DistributedCacheEntry(directory1.toString(), Boolean.valueOf(true)));
        originalArtifacts.put("nonExecutableDIr", new DistributedCache.DistributedCacheEntry(directory2.toString(), Boolean.valueOf(false)));
        Map submittedArtifacts = JobGraphUtils.prepareUserArtifactEntries(originalArtifacts, (JobID)new JobID());
        DistributedCache.DistributedCacheEntry executableFileEntry = (DistributedCache.DistributedCacheEntry)submittedArtifacts.get("executableFile");
        JobGraphGeneratorTest.assertState(executableFileEntry, true, false);
        DistributedCache.DistributedCacheEntry nonExecutableFileEntry = (DistributedCache.DistributedCacheEntry)submittedArtifacts.get("nonExecutableFile");
        JobGraphGeneratorTest.assertState(nonExecutableFileEntry, false, false);
        DistributedCache.DistributedCacheEntry executableDirEntry = (DistributedCache.DistributedCacheEntry)submittedArtifacts.get("executableDir");
        JobGraphGeneratorTest.assertState(executableDirEntry, true, true);
        DistributedCache.DistributedCacheEntry nonExecutableDirEntry = (DistributedCache.DistributedCacheEntry)submittedArtifacts.get("nonExecutableDIr");
        JobGraphGeneratorTest.assertState(nonExecutableDirEntry, false, true);
    }

    @Test
    public void testGeneratedJobsAreBatchJobType() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.fromElements((Object[])new String[]{"test"}).output((OutputFormat)new DiscardingOutputFormat());
        JobGraph graph = JobGraphGeneratorTest.compileJob(env);
        Assert.assertThat((Object)graph.getJobType(), (Matcher)CoreMatchers.is((Object)JobType.BATCH));
    }

    @Test
    public void testGeneratingJobGraphWithUnconsumedResultPartition() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Operator input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)}).setParallelism(1);
        Operator ds = input.map(new IdentityMapper()).setParallelism(3);
        AbstractID intermediateDataSetID = new AbstractID();
        ds.output((OutputFormat)BlockingShuffleOutputFormat.createOutputFormat((AbstractID)intermediateDataSetID)).setParallelism(1);
        ds.output((OutputFormat)new DiscardingOutputFormat()).setParallelism(1);
        JobGraph jobGraph = JobGraphGeneratorTest.compileJob(env);
        Assert.assertEquals((long)3L, (long)jobGraph.getVerticesSortedTopologicallyFromSources().size());
        JobVertex mapVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assert.assertThat((Object)mapVertex, (Matcher)Matchers.instanceOf(JobVertex.class));
        Assert.assertEquals((long)2L, (long)mapVertex.getProducedDataSets().size());
        Assert.assertTrue((boolean)mapVertex.getProducedDataSets().stream().anyMatch(dataSet -> dataSet.getId().equals((Object)new IntermediateDataSetID(intermediateDataSetID)) && dataSet.getResultType() == ResultPartitionType.BLOCKING_PERSISTENT));
    }

    private static void assertState(DistributedCache.DistributedCacheEntry entry, boolean isExecutable, boolean isZipped) throws IOException {
        Assert.assertNotNull((Object)entry);
        Assert.assertEquals((Object)isExecutable, (Object)entry.isExecutable);
        Assert.assertEquals((Object)isZipped, (Object)entry.isZipped);
        Path filePath = new Path(entry.filePath);
        Assert.assertTrue((boolean)filePath.getFileSystem().exists(filePath));
        Assert.assertFalse((boolean)filePath.getFileSystem().getFileStatus(filePath).isDir());
    }

    private static JobGraph compileJob(ExecutionEnvironment env) {
        Plan plan = env.createProgramPlan();
        Optimizer pc = new Optimizer(new Configuration());
        OptimizedPlan op = pc.compile(plan);
        JobGraphGenerator jgg = new JobGraphGenerator();
        return jgg.compileJobGraph(op);
    }
}

