/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.ProgrammedSlotProvider;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;

public class ExecutionGraphDeploymentTest {
    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobId = new JobID();
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertexID jid3 = new JobVertexID();
            JobVertexID jid4 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            JobVertex v3 = new JobVertex("v3", jid3);
            JobVertex v4 = new JobVertex("v4", jid4);
            v1.setParallelism(10);
            v2.setParallelism(10);
            v3.setParallelism(10);
            v4.setParallelism(10);
            v1.setInvokableClass(BatchTask.class);
            v2.setInvokableClass(BatchTask.class);
            v3.setInvokableClass(BatchTask.class);
            v4.setInvokableClass(BatchTask.class);
            v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            ExecutionGraph eg = new ExecutionGraph(TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), jobId, "some job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (SlotProvider)new Scheduler((Executor)TestingUtils.defaultExecutionContext()));
            List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
            eg.attachJobGraph(ordered);
            ExecutionJobVertex ejv = (ExecutionJobVertex)eg.getAllVertices().get(jid2);
            ExecutionVertex vertex = ejv.getTaskVertices()[3];
            ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext());
            Instance instance = ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)instanceGateway));
            SimpleSlot slot = instance.allocateSimpleSlot(jobId);
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            TaskDeploymentDescriptor descr = instanceGateway.lastTDD;
            Assert.assertNotNull((Object)descr);
            JobInformation jobInformation = (JobInformation)descr.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
            TaskInformation taskInformation = (TaskInformation)descr.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
            Assert.assertEquals((Object)jobId, (Object)jobInformation.getJobId());
            Assert.assertEquals((Object)jid2, (Object)taskInformation.getJobVertexId());
            Assert.assertEquals((long)3L, (long)descr.getSubtaskIndex());
            Assert.assertEquals((long)10L, (long)taskInformation.getNumberOfSubtasks());
            Assert.assertEquals((Object)BatchTask.class.getName(), (Object)taskInformation.getInvokableClassName());
            Assert.assertEquals((Object)"v2", (Object)taskInformation.getTaskName());
            Collection producedPartitions = descr.getProducedPartitions();
            Collection consumedPartitions = descr.getInputGates();
            Assert.assertEquals((long)2L, (long)producedPartitions.size());
            Assert.assertEquals((long)1L, (long)consumedPartitions.size());
            Iterator iteratorProducedPartitions = producedPartitions.iterator();
            Iterator iteratorConsumedPartitions = consumedPartitions.iterator();
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions());
            Assert.assertEquals((long)10L, (long)((InputGateDeploymentDescriptor)iteratorConsumedPartitions.next()).getInputChannelDeploymentDescriptors().length);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map executions = (Map)this.setupExecution((JobVertex)v1, (int)7650, (JobVertex)v2, (int)2350).f1;
            for (Execution e : executions.values()) {
                e.markFinished();
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map executions = (Map)this.setupExecution((JobVertex)v1, (int)7, (JobVertex)v2, (int)6).f1;
            for (Execution e : executions.values()) {
                e.markFailed(null);
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map executions = (Map)this.setupExecution((JobVertex)v1, (int)7, (JobVertex)v2, (int)6).f1;
            for (Execution e : executions.values()) {
                e.fail(null);
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAccumulatorsAndMetricsForwarding() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> graphAndExecutions = this.setupExecution(v1, 1, v2, 1);
        ExecutionGraph graph = (ExecutionGraph)graphAndExecutions.f0;
        Execution execution1 = (Execution)((Map)graphAndExecutions.f1).values().iterator().next();
        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
        HashMap<String, IntCounter> accumulators = new HashMap<String, IntCounter>();
        accumulators.put("acc", new IntCounter(4));
        AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
        TaskExecutionState state = new TaskExecutionState(graph.getJobID(), execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
        graph.updateState(state);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertNotNull((Object)execution1.getUserAccumulators());
        Assert.assertEquals((Object)4, (Object)((Accumulator)execution1.getUserAccumulators().get("acc")).getLocalValue());
        Execution execution2 = (Execution)((Map)graphAndExecutions.f1).values().iterator().next();
        IOMetrics ioMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
        HashMap<String, IntCounter> accumulators2 = new HashMap<String, IntCounter>();
        accumulators2.put("acc", new IntCounter(8));
        AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
        TaskExecutionState state2 = new TaskExecutionState(graph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
        graph.updateState(state2);
        Assert.assertEquals((Object)ioMetrics2, (Object)execution2.getIOMetrics());
        Assert.assertNotNull((Object)execution2.getUserAccumulators());
        Assert.assertEquals((Object)8, (Object)((Accumulator)execution2.getUserAccumulators().get("acc")).getLocalValue());
    }

    @Test
    public void testAccumulatorsAndMetricsStorage() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        Map executions = (Map)this.setupExecution((JobVertex)v1, (int)1, (JobVertex)v2, (int)1).f1;
        IOMetrics ioMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0, 0.0, 0.0, 0.0, 0.0);
        Map accumulators = Collections.emptyMap();
        Execution execution1 = (Execution)executions.values().iterator().next();
        execution1.cancel();
        execution1.cancelingComplete(accumulators, ioMetrics);
        Assert.assertEquals((Object)ioMetrics, (Object)execution1.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution1.getUserAccumulators());
        Execution execution2 = (Execution)executions.values().iterator().next();
        execution2.markFailed(new Throwable(), accumulators, ioMetrics);
        Assert.assertEquals((Object)ioMetrics, (Object)execution2.getIOMetrics());
        Assert.assertEquals(accumulators, (Object)execution2.getUserAccumulators());
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            JobVertexID jid1 = new JobVertexID();
            JobVertexID jid2 = new JobVertexID();
            JobVertex v1 = new JobVertex("v1", jid1);
            JobVertex v2 = new JobVertex("v2", jid2);
            Map executions = (Map)this.setupExecution((JobVertex)v1, (int)19, (JobVertex)v2, (int)37).f1;
            for (Execution e : executions.values()) {
                e.cancel();
                e.cancelingComplete();
            }
            Assert.assertEquals((long)0L, (long)executions.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        int dop1 = 1;
        int dop2 = 1;
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.directExecutionContext());
        for (int i = 0; i < dop1; ++i) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()))));
        }
        ExecutionGraph eg = new ExecutionGraph((ScheduledExecutorService)new DirectScheduledExecutorService(), (Executor)TestingUtils.defaultExecutor(), jobId, "failing test job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (SlotProvider)scheduler);
        eg.setQueuedSchedulingAllowed(false);
        List<JobVertex> ordered = Arrays.asList(v1, v2);
        eg.attachJobGraph(ordered);
        Assert.assertEquals((long)dop1, (long)scheduler.getNumberOfAvailableSlots());
        eg.scheduleForExecution();
        ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING));
        eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null));
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration jobManagerConfig = new Configuration();
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertEquals((long)((Integer)CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
        int maxNumberOfCheckpointsToRetain = 10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, 10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertEquals((long)10L, (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
        JobID jobId = new JobID();
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        Scheduler scheduler = new Scheduler((Executor)TestingUtils.defaultExecutionContext());
        for (int i = 0; i < dop1 + dop2; ++i) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance((TaskManagerGateway)new ActorTaskManagerGateway((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()))));
        }
        ExecutionGraph eg = new ExecutionGraph((ScheduledExecutorService)new DirectScheduledExecutorService(), (Executor)TestingUtils.defaultExecutor(), jobId, "some job", new Configuration(), new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy(), (SlotProvider)scheduler);
        eg.setQueuedSchedulingAllowed(false);
        List<JobVertex> ordered = Arrays.asList(v1, v2);
        eg.attachJobGraph(ordered);
        Assert.assertEquals((long)(dop1 + dop2), (long)scheduler.getNumberOfAvailableSlots());
        eg.scheduleForExecution();
        Map executions = eg.getRegisteredExecutions();
        Assert.assertEquals((long)(dop1 + dop2), (long)executions.size());
        return new Tuple2((Object)eg, (Object)executions);
    }

    @Test
    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        int negativeMaxNumberOfCheckpointsToRetain = -10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assert.assertNotEquals((long)-10L, (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        Assert.assertEquals((long)((Integer)CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), (long)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        ScheduledExecutorService executor = TestingUtils.defaultExecutor();
        JobID jobId = new JobID();
        JobGraph jobGraph = new JobGraph(jobId, "test");
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 100L, 600000L, 0L, 1, ExternalizedCheckpointSettings.none(), null, false));
        return ExecutionGraphBuilder.buildGraph(null, (JobGraph)jobGraph, (Configuration)configuration, (ScheduledExecutorService)executor, (Executor)executor, (SlotProvider)new ProgrammedSlotProvider(1), (ClassLoader)this.getClass().getClassLoader(), (CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory(), (Time)Time.seconds((long)10L), (RestartStrategy)new NoRestartStrategy(), (MetricGroup)new UnregisteredMetricsGroup(), (int)1, (Logger)LoggerFactory.getLogger(this.getClass()));
    }

    public static class FailingFinalizeJobVertex
    extends JobVertex {
        public FailingFinalizeJobVertex(String name, JobVertexID id) {
            super(name, id);
        }

        public void finalizeOnMaster(ClassLoader cl) throws Exception {
            throw new Exception();
        }
    }
}

