/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobgraph;

import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class JobTaskVertexTest {
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Test
    public void testConnectDirectly() {
        JobVertex source = new JobVertex("source");
        JobVertex target = new JobVertex("target");
        target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        Assert.assertTrue((boolean)source.isInputVertex());
        Assert.assertFalse((boolean)source.isOutputVertex());
        Assert.assertFalse((boolean)target.isInputVertex());
        Assert.assertTrue((boolean)target.isOutputVertex());
        Assert.assertEquals((long)1L, (long)source.getNumberOfProducedIntermediateDataSets());
        Assert.assertEquals((long)1L, (long)target.getNumberOfInputs());
        Assert.assertEquals((Object)((JobEdge)target.getInputs().get(0)).getSource(), source.getProducedDataSets().get(0));
        Assert.assertEquals((Object)target, (Object)((IntermediateDataSet)source.getProducedDataSets().get(0)).getConsumer().getTarget());
    }

    @Test
    public void testOutputFormat() {
        try {
            InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
            OperatorID operatorID = new OperatorID();
            Configuration parameters = new Configuration();
            parameters.setString("test_key", "test_value");
            new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, (OutputFormat)new TestingOutputFormat(parameters)).addParameters(operatorID, parameters).write(new TaskConfig(vertex.getConfiguration()));
            TestClassLoader cl = new TestClassLoader();
            try {
                vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, vertex.getParallelism()));
                Assert.fail((String)"Did not throw expected exception.");
            }
            catch (TestException testException) {
                // empty catch block
            }
            InputOutputFormatVertex copy = (InputOutputFormatVertex)InstantiationUtil.clone((Serializable)vertex);
            ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
            try {
                copy.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, copy.getParallelism()));
                Assert.fail((String)"Did not throw expected exception.");
            }
            catch (TestException testException) {
                // empty catch block
            }
            Assert.assertEquals((String)"Previous classloader was not restored.", (Object)ctxCl, (Object)Thread.currentThread().getContextClassLoader());
            try {
                copy.finalizeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, copy.getParallelism()));
                Assert.fail((String)"Did not throw expected exception.");
            }
            catch (TestException testException) {
                // empty catch block
            }
            Assert.assertEquals((String)"Previous classloader was not restored.", (Object)ctxCl, (Object)Thread.currentThread().getContextClassLoader());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testInputFormat() {
        try {
            InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
            OperatorID operatorID = new OperatorID();
            Configuration parameters = new Configuration();
            parameters.setString("test_key", "test_value");
            new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addInputFormat(operatorID, (InputFormat)new TestInputFormat(parameters)).addParameters(operatorID, "test_key", "test_value").write(new TaskConfig(vertex.getConfiguration()));
            TestClassLoader cl = new TestClassLoader();
            vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, vertex.getParallelism()));
            InputSplit[] splits = vertex.getInputSplitSource().createInputSplits(77);
            Assert.assertNotNull((Object)splits);
            Assert.assertEquals((long)1L, (long)splits.length);
            Assert.assertEquals(TestSplit.class, splits[0].getClass());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testOutputFormatUsesCorrectParallelism() throws Exception {
        InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
        int initialParallelism = 1;
        vertex.setParallelism(initialParallelism);
        OperatorID operatorID = new OperatorID();
        SharedReference globalParallelism = this.sharedObjects.add((Object)new AtomicInteger());
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, (OutputFormat)new TestInitializeOutputFormat(globalParallelism)).write(new TaskConfig(vertex.getConfiguration()));
        int executionParallelism = initialParallelism + 3;
        try (TestClassLoader cl = new TestClassLoader();){
            vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, executionParallelism));
            Assertions.assertThat((int)((AtomicInteger)globalParallelism.get()).get()).isEqualTo(executionParallelism);
        }
    }

    private static class TestClassLoader
    extends URLClassLoader {
        public TestClassLoader() {
            super(new URL[0], Thread.currentThread().getContextClassLoader());
        }
    }

    private static final class TestingOutputFormat
    extends DiscardingOutputFormat<Object>
    implements InitializeOnMaster,
    FinalizeOnMaster {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestingOutputFormat(Configuration expectedParameters) {
            this.expectedParameters = expectedParameters;
        }

        public void initializeGlobal(int parallelism) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before initializeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void finalizeGlobal(int parallelism) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before finalizeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void configure(Configuration parameters) {
            if (this.isConfigured) {
                throw new IllegalStateException("OutputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String key : this.expectedParameters.keySet()) {
                Assert.assertEquals((Object)this.expectedParameters.getString(key, null), (Object)parameters.getString(key, null));
            }
            this.isConfigured = true;
        }
    }

    private static final class TestInputFormat
    extends GenericInputFormat<Object> {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestInputFormat(Configuration expectedParameters) {
            this.expectedParameters = expectedParameters;
        }

        public boolean reachedEnd() {
            return false;
        }

        public Object nextRecord(Object reuse) {
            return null;
        }

        public GenericInputSplit[] createInputSplits(int numSplits) {
            if (!this.isConfigured) {
                throw new IllegalStateException("InputFormat was not configured before createInputSplits was called.");
            }
            return new GenericInputSplit[]{new TestSplit(0, 1)};
        }

        public void configure(Configuration parameters) {
            if (this.isConfigured) {
                throw new IllegalStateException("InputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String key : this.expectedParameters.keySet()) {
                Assert.assertEquals((Object)this.expectedParameters.getString(key, null), (Object)parameters.getString(key, null));
            }
            this.isConfigured = true;
        }
    }

    private static final class TestSplit
    extends GenericInputSplit {
        public TestSplit(int partitionNumber, int totalNumberOfPartitions) {
            super(partitionNumber, totalNumberOfPartitions);
        }
    }

    private static final class TestException
    extends IOException {
        private TestException() {
        }
    }

    private static final class TestInitializeOutputFormat
    implements OutputFormat<Object>,
    InitializeOnMaster {
        private final SharedReference<AtomicInteger> globalParallelism;

        private TestInitializeOutputFormat(SharedReference<AtomicInteger> globalParallelism) {
            this.globalParallelism = globalParallelism;
        }

        public void configure(Configuration parameters) {
        }

        public void open(int taskNumber, int numTasks) throws IOException {
        }

        public void writeRecord(Object record) throws IOException {
        }

        public void close() throws IOException {
        }

        public void initializeGlobal(int parallelism) throws IOException {
            ((AtomicInteger)this.globalParallelism.get()).set(parallelism);
        }
    }
}

