/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class Client {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    final Optimizer compiler;
    private final ActorSystem actorSystem;
    private final Configuration config;
    private final FiniteDuration timeout;
    private final FiniteDuration lookupTimeout;
    private final int maxSlots;
    private boolean printStatusDuringExecution = true;
    private JobID lastJobID;

    public Client(Configuration config) throws IOException {
        this(config, -1);
    }

    public Client(Configuration config, int maxSlots) throws IOException {
        this.config = Preconditions.checkNotNull(config);
        this.compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), config);
        this.maxSlots = maxSlots;
        LOG.info("Starting client actor system");
        try {
            this.actorSystem = JobClient.startJobClientActorSystem((Configuration)config);
        }
        catch (Exception e) {
            throw new IOException("Could start client actor system.", e);
        }
        this.timeout = AkkaUtils.getClientTimeout((Configuration)config);
        this.lookupTimeout = AkkaUtils.getLookupTimeout((Configuration)config);
    }

    public void shutdown() {
        if (!this.actorSystem.isTerminated()) {
            this.actorSystem.shutdown();
            this.actorSystem.awaitTermination();
        }
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public boolean getPrintStatusDuringExecution() {
        return this.printStatusDuringExecution;
    }

    public int getMaxSlots() {
        return this.maxSlots;
    }

    public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan)Client.getOptimizedPlan(compiler, prog, parallelism));
    }

    public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return Client.getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
        }
        if (prog.isUsingInteractiveMode()) {
            OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
            if (parallelism > 0) {
                env.setParallelism(parallelism);
            }
            return env.getOptimizedPlan(prog);
        }
        throw new RuntimeException("Couldn't determine program mode.");
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            LOG.debug("Changing plan default parallelism from {} to {}", (Object)p.getDefaultParallelism(), (Object)parallelism);
            p.setDefaultParallelism(parallelism);
        }
        LOG.debug("Set parallelism {}, plan default parallelism {}", (Object)parallelism, (Object)p.getDefaultParallelism());
        return compiler.compile(p);
    }

    public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
        }
        if (prog.isUsingInteractiveMode()) {
            LOG.info("Starting program in interactive mode");
            ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(), prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true, prog.getSavepointPath()));
            try {
                prog.invokeInteractiveModeForExecution();
            }
            finally {
                ContextEnvironment.unsetContext();
            }
            return new JobSubmissionResult(this.lastJobID);
        }
        throw new RuntimeException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.runDetached(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
        }
        if (prog.isUsingInteractiveMode()) {
            LOG.info("Starting program in interactive mode");
            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false, prog.getSavepointPath());
            ContextEnvironment.setAsContext(factory);
            try {
                prog.invokeInteractiveModeForExecution();
                JobSubmissionResult jobSubmissionResult = ((DetachedEnvironment)factory.getLastEnvCreated()).finalizeExecute();
                return jobSubmissionResult;
            }
            finally {
                ContextEnvironment.unsetContext();
            }
        }
        throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
    }

    public JobExecutionResult runBlocking(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return this.runBlocking(program, parallelism, null);
    }

    public JobExecutionResult runBlocking(JobWithJars program, int parallelism, String savepointPath) throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = program.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        OptimizedPlan optPlan = Client.getOptimizedPlan(this.compiler, program, parallelism);
        return this.runBlocking((FlinkPlan)optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
    }

    public JobSubmissionResult runDetached(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return this.runDetached(program, parallelism, null);
    }

    public JobSubmissionResult runDetached(JobWithJars program, int parallelism, String savepointPath) throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = program.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }
        OptimizedPlan optimizedPlan = Client.getOptimizedPlan(this.compiler, program, parallelism);
        return this.runDetached((FlinkPlan)optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
    }

    public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
        return this.runBlocking(compiledPlan, libraries, classpaths, classLoader, null);
    }

    public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath) throws ProgramInvocationException {
        JobGraph job = this.getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
        return this.runBlocking(job, classLoader);
    }

    public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
        return this.runDetached(compiledPlan, libraries, classpaths, classLoader, null);
    }

    public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath) throws ProgramInvocationException {
        JobGraph job = this.getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
        return this.runDetached(job, classLoader);
    }

    public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        LeaderRetrievalService leaderRetrievalService;
        try {
            leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService((Configuration)this.config);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
        }
        try {
            this.lastJobID = jobGraph.getJobID();
            return JobClient.submitJobAndWait((ActorSystem)this.actorSystem, (LeaderRetrievalService)leaderRetrievalService, (JobGraph)jobGraph, (FiniteDuration)this.timeout, (boolean)this.printStatusDuringExecution, (ClassLoader)classLoader);
        }
        catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        ActorGateway jobManagerGateway;
        try {
            jobManagerGateway = this.getJobManagerGateway();
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
        }
        LOG.info("Checking and uploading JAR files");
        try {
            JobClient.uploadJarFiles((JobGraph)jobGraph, (ActorGateway)jobManagerGateway, (FiniteDuration)this.timeout);
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
        }
        try {
            this.lastJobID = jobGraph.getJobID();
            JobClient.submitJobDetached((ActorGateway)jobManagerGateway, (JobGraph)jobGraph, (FiniteDuration)this.timeout, (ClassLoader)classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        }
        catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }

    public void cancel(JobID jobId) throws Exception {
        Future response;
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        try {
            response = jobManagerGateway.ask((Object)new JobManagerMessages.CancelJob(jobId), this.timeout);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
        }
        Object result = Await.result((Awaitable)response, (Duration)this.timeout);
        if (!(result instanceof JobManagerMessages.CancellationSuccess)) {
            if (result instanceof JobManagerMessages.CancellationFailure) {
                Throwable t = ((JobManagerMessages.CancellationFailure)result).cause();
                LOG.info("Job cancellation with ID " + jobId + " failed.", t);
                throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
            }
            throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
        }
        LOG.info("Job cancellation with ID " + jobId + " succeeded.");
    }

    public void stop(JobID jobId) throws Exception {
        Future response;
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        try {
            response = jobManagerGateway.ask((Object)new JobManagerMessages.StopJob(jobId), this.timeout);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
        }
        Object result = Await.result((Awaitable)response, (Duration)this.timeout);
        if (!(result instanceof JobManagerMessages.StoppingSuccess)) {
            if (result instanceof JobManagerMessages.StoppingFailure) {
                Throwable t = ((JobManagerMessages.StoppingFailure)result).cause();
                LOG.info("Job stopping with ID " + jobId + " failed.", t);
                throw new Exception("Failed to stop the job because of \n" + t.getMessage());
            }
            throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
        }
        LOG.info("Job stopping with ID " + jobId + " succeeded.");
    }

    public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
        return this.getAccumulators(jobID, ClassLoader.getSystemClassLoader());
    }

    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
        Future response;
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        try {
            response = jobManagerGateway.ask((Object)new RequestAccumulatorResults(jobID), this.timeout);
        }
        catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }
        Object result = Await.result((Awaitable)response, (Duration)this.timeout);
        if (result instanceof AccumulatorResultsFound) {
            Map serializedAccumulators = ((AccumulatorResultsFound)result).result();
            return AccumulatorHelper.deserializeAccumulators((Map)serializedAccumulators, (ClassLoader)loader);
        }
        if (result instanceof AccumulatorResultsErroneous) {
            throw ((AccumulatorResultsErroneous)result).cause();
        }
        throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
    }

    public void endSession(JobID jobId) throws Exception {
        if (jobId == null) {
            throw new IllegalArgumentException("The JobID must not be null.");
        }
        this.endSessions(Collections.singletonList(jobId));
    }

    public void endSessions(List<JobID> jobIds) throws Exception {
        if (jobIds == null) {
            throw new IllegalArgumentException("The JobIDs must not be null");
        }
        ActorGateway jobManagerGateway = this.getJobManagerGateway();
        for (JobID jid : jobIds) {
            if (jid == null) continue;
            LOG.info("Telling job manager to end the session {}.", (Object)jid);
            jobManagerGateway.tell((Object)new JobManagerMessages.RemoveCachedJob(jid));
        }
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return Client.getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
        return this.getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
    }

    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
        return this.getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
    }

    private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
        JobGraph job;
        if (optPlan instanceof StreamingPlan) {
            job = ((StreamingPlan)optPlan).getJobGraph();
            job.setSavepointPath(savepointPath);
        } else {
            JobGraphGenerator gen = new JobGraphGenerator(this.config);
            job = gen.compileJobGraph((OptimizedPlan)optPlan);
        }
        for (URL jar : jarFiles) {
            try {
                job.addJar(new Path(jar.toURI()));
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
        job.setClasspaths(classpaths);
        return job;
    }

    private ActorGateway getJobManagerGateway() throws Exception {
        LOG.info("Looking up JobManager");
        LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService((Configuration)this.config);
        return LeaderRetrievalUtils.retrieveLeaderGateway((LeaderRetrievalService)leaderRetrievalService, (ActorSystem)this.actorSystem, (FiniteDuration)this.lookupTimeout);
    }
}

