/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class ExecutionGraphBuilder {
    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior, JobGraph jobGraph, Configuration jobManagerConfig, Executor futureExecutor, Executor ioExecutor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time timeout, RestartStrategy restartStrategy, MetricGroup metrics, int parallelismForAutoMax, Logger log) throws JobExecutionException, JobException {
        JobSnapshottingSettings snapshotSettings;
        ExecutionGraph executionGraph;
        Preconditions.checkNotNull((Object)jobGraph, (String)"job graph cannot be null");
        String jobName = jobGraph.getName();
        JobID jobId = jobGraph.getJobID();
        try {
            executionGraph = prior != null ? prior : new ExecutionGraph(futureExecutor, ioExecutor, jobId, jobName, jobGraph.getJobConfiguration(), jobGraph.getSerializedExecutionConfig(), timeout, restartStrategy, jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths(), slotProvider, classLoader, metrics);
        }
        catch (IOException e) {
            throw new JobException("Could not create the execution graph.", e);
        }
        executionGraph.setScheduleMode(jobGraph.getScheduleMode());
        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
        try {
            executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        }
        catch (Throwable t) {
            log.warn("Cannot create JSON plan for job", t);
            executionGraph.setJsonPlan("{}");
        }
        long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", (Object)jobName, (Object)jobId);
        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId, "The vertex " + (Object)((Object)vertex.getID()) + " (" + vertex.getName() + ") has no invokable class.");
            }
            if (vertex.getParallelism() == Integer.MAX_VALUE) {
                vertex.setParallelism(parallelismForAutoMax);
            }
            try {
                vertex.initializeOnMaster(classLoader);
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobId, "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
            }
        }
        log.info("Successfully ran initialization on master in {} ms.", (Object)((System.nanoTime() - initMasterStart) / 1000000L));
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", new Object[]{sortedTopology.size(), jobName, jobId});
        }
        executionGraph.attachJobGraph(sortedTopology);
        if (log.isDebugEnabled()) {
            log.debug("Successfully created execution graph from job graph {} ({}).", (Object)jobName, (Object)jobId);
        }
        if ((snapshotSettings = jobGraph.getSnapshotSettings()) != null) {
            CheckpointIDCounter checkpointIdCounter;
            CompletedCheckpointStore completedCheckpoints;
            List<ExecutionJobVertex> triggerVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
            List<ExecutionJobVertex> ackVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
            List<ExecutionJobVertex> confirmVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
            try {
                completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader);
                checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
            }
            catch (Exception e) {
                throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
            }
            int historySize = jobManagerConfig.getInteger("jobmanager.web.checkpoints.history", 10);
            CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize, ackVertices, snapshotSettings, metrics);
            String externalizedCheckpointsDir = jobManagerConfig.getString("state.checkpoints.dir", null);
            executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), snapshotSettings.getMinPauseBetweenCheckpoints(), snapshotSettings.getMaxConcurrentCheckpoints(), snapshotSettings.getExternalizedCheckpointSettings(), triggerVertices, ackVertices, confirmVertices, checkpointIdCounter, completedCheckpoints, externalizedCheckpointsDir, checkpointStatsTracker);
        }
        return executionGraph;
    }

    private static List<ExecutionJobVertex> idToVertex(List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException {
        ArrayList<ExecutionJobVertex> result = new ArrayList<ExecutionJobVertex>(jobVertices.size());
        for (JobVertexID id : jobVertices) {
            ExecutionJobVertex vertex = executionGraph.getJobVertex(id);
            if (vertex != null) {
                result.add(vertex);
                continue;
            }
            throw new IllegalArgumentException("The snapshot checkpointing settings refer to non-existent vertex " + (Object)((Object)id));
        }
        return result;
    }

    private ExecutionGraphBuilder() {
    }
}

