/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.contextcheck.ContextChecker;
import org.apache.flink.compiler.costs.CostEstimator;
import org.apache.flink.compiler.costs.DefaultCostEstimator;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Client {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final Configuration configuration;
    private final PactCompiler compiler;
    private final ClassLoader userCodeClassLoader;
    private boolean printStatusDuringExecution;

    public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader) {
        Preconditions.checkNotNull((Object)config, (Object)"Configuration is null");
        this.configuration = config;
        this.configuration.setString("jobmanager.rpc.address", jobManagerAddress.getAddress().getHostAddress());
        this.configuration.setInteger("jobmanager.rpc.port", jobManagerAddress.getPort());
        this.userCodeClassLoader = userCodeClassLoader;
        this.compiler = new PactCompiler(new DataStatistics(), (CostEstimator)new DefaultCostEstimator());
    }

    public Client(Configuration config, ClassLoader userCodeClassLoader) {
        Preconditions.checkNotNull((Object)config, (Object)"Configuration is null");
        this.configuration = config;
        String address = config.getString("jobmanager.rpc.address", null);
        if (address == null) {
            throw new CompilerException("Cannot find address to job manager's RPC service in the global configuration.");
        }
        int port = GlobalConfiguration.getInteger((String)"jobmanager.rpc.port", (int)6123);
        if (port < 0) {
            throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
        }
        this.userCodeClassLoader = userCodeClassLoader;
        this.compiler = new PactCompiler(new DataStatistics(), (CostEstimator)new DefaultCostEstimator());
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public String getJobManagerAddress() {
        return this.configuration.getString("jobmanager.rpc.address", null);
    }

    public int getJobManagerPort() {
        return this.configuration.getInteger("jobmanager.rpc.port", -1);
    }

    public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON(this.getOptimizedPlan(prog, parallelism));
    }

    public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.getOptimizedPlan(prog.getPlanWithJars(), parallelism);
        }
        if (prog.isUsingInteractiveMode()) {
            OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
            if (parallelism > 0) {
                env.setDegreeOfParallelism(parallelism);
            }
            env.setAsContext();
            PrintStream originalOut = System.out;
            PrintStream originalErr = System.err;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            System.setOut(new PrintStream(baos));
            ByteArrayOutputStream baes = new ByteArrayOutputStream();
            System.setErr(new PrintStream(baes));
            try {
                ContextEnvironment.enableLocalExecution(false);
                prog.invokeInteractiveModeForExecution();
            }
            catch (ProgramInvocationException e) {
                throw e;
            }
            catch (Throwable t) {
                if (env.optimizerPlan != null) {
                    OptimizedPlan optimizedPlan = env.optimizerPlan;
                    return optimizedPlan;
                }
                throw new ProgramInvocationException("The program caused an error: ", t);
            }
            finally {
                ContextEnvironment.enableLocalExecution(true);
                System.setOut(originalOut);
                System.setErr(originalErr);
                System.err.println(baes);
                System.out.println(baos);
            }
            throw new ProgramInvocationException("The program plan could not be fetched - the program aborted pre-maturely.\nSystem.err: " + baes.toString() + '\n' + "System.out: " + baos.toString() + '\n');
        }
        throw new RuntimeException();
    }

    public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            p.setDefaultParallelism(parallelism);
        }
        ContextChecker checker = new ContextChecker();
        checker.check(p);
        return this.compiler.compile(p);
    }

    public OptimizedPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return this.getOptimizedPlan(prog.getPlan(), parallelism);
    }

    public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan optPlan) throws ProgramInvocationException {
        return this.getJobGraph(optPlan, prog.getAllLibraries());
    }

    private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> jarFiles) {
        NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
        JobGraph job = gen.compileJobGraph(optPlan);
        for (File jar : jarFiles) {
            job.addJar(new Path(jar.getAbsolutePath()));
        }
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.run(prog.getPlanWithJars(), parallelism, wait);
        }
        if (prog.isUsingInteractiveMode()) {
            ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism);
            ContextEnvironment.enableLocalExecution(false);
            if (wait) {
                try {
                    prog.invokeInteractiveModeForExecution();
                }
                finally {
                    ContextEnvironment.enableLocalExecution(true);
                }
            } else {
                Thread backGroundRunner = new Thread("Program Runner"){

                    @Override
                    public void run() {
                        try {
                            prog.invokeInteractiveModeForExecution();
                        }
                        catch (Throwable t) {
                            LOG.error("The program execution failed.", t);
                        }
                        finally {
                            ContextEnvironment.enableLocalExecution(true);
                        }
                    }
                };
                backGroundRunner.start();
            }
            return null;
        }
        throw new RuntimeException();
    }

    public JobExecutionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
        return this.run(optimizedPlan, prog.getAllLibraries(), wait);
    }

    public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
        return this.run(this.getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
    }

    public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
        JobGraph job = this.getJobGraph(compiledPlan, libraries);
        return this.run(job, wait);
    }

    public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
        JobClient client;
        try {
            client = new JobClient(jobGraph, this.configuration, this.userCodeClassLoader);
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not open job manager: " + e.getMessage());
        }
        client.setConsoleStreamForReporting(this.printStatusDuringExecution ? System.out : null);
        try {
            if (wait) {
                return client.submitJobAndWait();
            }
            JobSubmissionResult result = client.submitJob();
            if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
                throw new ProgramInvocationException("The job was not successfully submitted to the nephele job manager" + (result.getDescription() == null ? "." : ": " + result.getDescription()));
            }
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not submit job to job manager: " + e.getMessage());
        }
        catch (JobExecutionException jex) {
            if (jex.isJobCanceledByUser()) {
                throw new ProgramInvocationException("The program has been canceled");
            }
            throw new ProgramInvocationException("The program execution failed: " + jex.getMessage());
        }
        return new JobExecutionResult(-1L, null);
    }

    public static final class ProgramAbortException
    extends Error {
        private static final long serialVersionUID = 1L;
    }

    private static final class OptimizerPlanEnvironment
    extends ExecutionEnvironment {
        private final PactCompiler compiler;
        private OptimizedPlan optimizerPlan;

        private OptimizerPlanEnvironment(PactCompiler compiler) {
            this.compiler = compiler;
        }

        public JobExecutionResult execute(String jobName) throws Exception {
            JavaPlan plan = this.createProgramPlan(jobName);
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        public String getExecutionPlan() throws Exception {
            JavaPlan plan = this.createProgramPlan(null, false);
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        private void setAsContext() {
            ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory(){

                public ExecutionEnvironment createExecutionEnvironment() {
                    return OptimizerPlanEnvironment.this;
                }
            };
            OptimizerPlanEnvironment.initializeContextEnvironment((ExecutionEnvironmentFactory)factory);
        }
    }
}

