/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class CliFrontend {
    private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
    private static final String ACTION_RUN = "run";
    private static final String ACTION_INFO = "info";
    private static final String ACTION_LIST = "list";
    private static final String ACTION_CANCEL = "cancel";
    private static final String ACTION_STOP = "stop";
    private static final String ACTION_SAVEPOINT = "savepoint";
    private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
    private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
    private static final List<CustomCommandLine> customCommandLine = new LinkedList<CustomCommandLine>();
    private final Configuration config;
    private final FiniteDuration clientTimeout;

    public CliFrontend() throws Exception {
        this(CliFrontend.getConfigurationDirectoryFromEnv());
    }

    public CliFrontend(String configDir) throws Exception {
        File configDirectory = new File(configDir);
        LOG.info("Using configuration directory " + configDirectory.getAbsolutePath());
        LOG.info("Trying to load configuration file");
        GlobalConfiguration.loadConfiguration((String)configDirectory.getAbsolutePath());
        System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
        this.config = GlobalConfiguration.getConfiguration();
        try {
            FileSystem.setDefaultScheme((Configuration)this.config);
        }
        catch (IOException e) {
            throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
        }
        this.clientTimeout = AkkaUtils.getClientTimeout((Configuration)this.config);
    }

    public Configuration getConfiguration() {
        Configuration copiedConfiguration = new Configuration();
        copiedConfiguration.addAll(this.config);
        return copiedConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int run(String[] args) {
        PackagedProgram program;
        RunOptions options;
        LOG.info("Running 'run' command.");
        try {
            options = CliFrontendParser.parseRunCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForRun();
            return 0;
        }
        if (options.getJarFilePath() == null) {
            return this.handleArgException(new CliArgsException("The program JAR file was not specified."));
        }
        try {
            LOG.info("Building program from JAR file");
            program = this.buildProgram(options);
        }
        catch (FileNotFoundException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        ClusterClient client = null;
        try {
            client = this.createClient(options, program.getMainClassName());
            client.setPrintStatusDuringExecution(options.getStdoutLogging());
            client.setDetached(options.getDetachedMode());
            LOG.debug("Client slots is set to {}", (Object)client.getMaxSlots());
            LOG.debug("Savepoint path is set to {}", (Object)options.getSavepointPath());
            int userParallelism = options.getParallelism();
            LOG.debug("User parallelism is set to {}", (Object)userParallelism);
            if (client.getMaxSlots() != -1 && userParallelism == -1) {
                this.logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots() + "). " + "To use another parallelism, set it at the ./bin/flink client.");
                userParallelism = client.getMaxSlots();
            }
            int n = this.executeProgram(program, client, userParallelism);
            return n;
        }
        catch (Throwable t) {
            int n = this.handleError(t);
            return n;
        }
        finally {
            if (client != null) {
                client.shutdown();
            }
            if (program != null) {
                program.deleteExtractedLibraries();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int info(String[] args) {
        PackagedProgram program;
        InfoOptions options;
        LOG.info("Running 'info' command.");
        try {
            options = CliFrontendParser.parseInfoCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForInfo();
            return 0;
        }
        if (options.getJarFilePath() == null) {
            return this.handleArgException(new CliArgsException("The program JAR file was not specified."));
        }
        try {
            LOG.info("Building program from JAR file");
            program = this.buildProgram(options);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        try {
            int parallelism = options.getParallelism();
            LOG.info("Creating program plan dump");
            Optimizer compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), this.config);
            FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
            String jsonPlan = null;
            if (flinkPlan instanceof OptimizedPlan) {
                jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan)flinkPlan);
            } else if (flinkPlan instanceof StreamingPlan) {
                jsonPlan = ((StreamingPlan)flinkPlan).getStreamingPlanAsJSON();
            }
            if (jsonPlan != null) {
                System.out.println("----------------------- Execution Plan -----------------------");
                System.out.println(jsonPlan);
                System.out.println("--------------------------------------------------------------");
            } else {
                System.out.println("JSON plan could not be generated.");
            }
            String description = program.getDescription();
            if (description != null) {
                System.out.println();
                System.out.println(description);
            } else {
                System.out.println();
                System.out.println("No description provided.");
            }
            int n = 0;
            return n;
        }
        catch (Throwable t) {
            int n = this.handleError(t);
            return n;
        }
        finally {
            program.deleteExtractedLibraries();
        }
    }

    protected int list(String[] args) {
        ListOptions options;
        LOG.info("Running 'list' command.");
        try {
            options = CliFrontendParser.parseListCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForList();
            return 0;
        }
        boolean running = options.getRunning();
        boolean scheduled = options.getScheduled();
        if (!running && !scheduled) {
            running = true;
            scheduled = true;
        }
        try {
            Object result;
            ActorGateway jobManagerGateway = this.getJobManagerGateway(options);
            LOG.info("Connecting to JobManager to retrieve list of jobs");
            Future response = jobManagerGateway.ask(JobManagerMessages.getRequestRunningJobsStatus(), this.clientTimeout);
            try {
                result = Await.result((Awaitable)response, (Duration)this.clientTimeout);
            }
            catch (Exception e) {
                throw new Exception("Could not retrieve running jobs from the JobManager.", e);
            }
            if (result instanceof JobManagerMessages.RunningJobsStatus) {
                LOG.info("Successfully retrieved list of jobs");
                List jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages();
                ArrayList<JobStatusMessage> runningJobs = null;
                ArrayList<JobStatusMessage> scheduledJobs = null;
                if (running) {
                    runningJobs = new ArrayList<JobStatusMessage>();
                }
                if (scheduled) {
                    scheduledJobs = new ArrayList<JobStatusMessage>();
                }
                for (JobStatusMessage rj : jobs) {
                    if (running && (rj.getJobState().equals((Object)JobStatus.RUNNING) || rj.getJobState().equals((Object)JobStatus.RESTARTING))) {
                        runningJobs.add(rj);
                    }
                    if (!scheduled || !rj.getJobState().equals((Object)JobStatus.CREATED)) continue;
                    scheduledJobs.add(rj);
                }
                SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
                Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){

                    @Override
                    public int compare(JobStatusMessage o1, JobStatusMessage o2) {
                        return (int)(o1.getStartTime() - o2.getStartTime());
                    }
                };
                if (running) {
                    if (runningJobs.size() == 0) {
                        System.out.println("No running jobs.");
                    } else {
                        Collections.sort(runningJobs, njec);
                        System.out.println("------------------ Running/Restarting Jobs -------------------");
                        for (JobStatusMessage rj : runningJobs) {
                            System.out.println(df.format(new Date(rj.getStartTime())) + " : " + rj.getJobId() + " : " + rj.getJobName() + " (" + rj.getJobState() + ")");
                        }
                        System.out.println("--------------------------------------------------------------");
                    }
                }
                if (scheduled) {
                    if (scheduledJobs.size() == 0) {
                        System.out.println("No scheduled jobs.");
                    } else {
                        Collections.sort(scheduledJobs, njec);
                        System.out.println("----------------------- Scheduled Jobs -----------------------");
                        for (JobStatusMessage rj : scheduledJobs) {
                            System.out.println(df.format(new Date(rj.getStartTime())) + " : " + rj.getJobId() + " : " + rj.getJobName());
                        }
                        System.out.println("--------------------------------------------------------------");
                    }
                }
                return 0;
            }
            throw new Exception("ReqeustRunningJobs requires a response of type RunningJobs. Instead the response is of type " + result.getClass() + ".");
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    protected int stop(String[] args) {
        JobID jobId;
        StopOptions options;
        LOG.info("Running 'stop' command.");
        try {
            options = CliFrontendParser.parseStopCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForStop();
            return 0;
        }
        String[] stopArgs = options.getArgs();
        if (stopArgs.length > 0) {
            String jobIdString = stopArgs[0];
            try {
                jobId = new JobID(StringUtils.hexStringToByte((String)jobIdString));
            }
            catch (Exception e) {
                return this.handleError(e);
            }
        } else {
            return this.handleArgException(new CliArgsException("Missing JobID"));
        }
        try {
            ActorGateway jobManager = this.getJobManagerGateway(options);
            Future response = jobManager.ask((Object)new JobManagerMessages.StopJob(jobId), this.clientTimeout);
            Object rc = Await.result((Awaitable)response, (Duration)this.clientTimeout);
            if (rc instanceof JobManagerMessages.StoppingFailure) {
                throw new Exception("Stopping the job with ID " + jobId + " failed.", ((JobManagerMessages.StoppingFailure)rc).cause());
            }
            return 0;
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    protected int cancel(String[] args) {
        JobID jobId;
        CancelOptions options;
        LOG.info("Running 'cancel' command.");
        try {
            options = CliFrontendParser.parseCancelCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForCancel();
            return 0;
        }
        String[] cleanedArgs = options.getArgs();
        if (cleanedArgs.length > 0) {
            String jobIdString = cleanedArgs[0];
            try {
                jobId = new JobID(StringUtils.hexStringToByte((String)jobIdString));
            }
            catch (Exception e) {
                LOG.error("Error: The value for the Job ID is not a valid ID.");
                System.out.println("Error: The value for the Job ID is not a valid ID.");
                return 1;
            }
        } else {
            LOG.error("Missing JobID in the command line arguments.");
            System.out.println("Error: Specify a Job ID to cancel a job.");
            return 1;
        }
        try {
            ActorGateway jobManager = this.getJobManagerGateway(options);
            Future response = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobId), this.clientTimeout);
            Object rc = Await.result((Awaitable)response, (Duration)this.clientTimeout);
            if (rc instanceof JobManagerMessages.CancellationFailure) {
                throw new Exception("Canceling the job with ID " + jobId + " failed.", ((JobManagerMessages.CancellationFailure)rc).cause());
            }
            return 0;
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    protected int savepoint(String[] args) {
        JobID jobId;
        SavepointOptions options;
        LOG.info("Running 'savepoint' command.");
        try {
            options = CliFrontendParser.parseSavepointCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForSavepoint();
            return 0;
        }
        if (options.isDispose()) {
            return this.disposeSavepoint(options);
        }
        String[] cleanedArgs = options.getArgs();
        if (cleanedArgs.length > 0) {
            String jobIdString = cleanedArgs[0];
            try {
                jobId = new JobID(StringUtils.hexStringToByte((String)jobIdString));
            }
            catch (Exception e) {
                return this.handleArgException(new IllegalArgumentException("Error: The value for the Job ID is not a valid ID."));
            }
        } else {
            return this.handleArgException(new IllegalArgumentException("Error: The value for the Job ID is not a valid ID. Specify a Job ID to trigger a savepoint."));
        }
        return this.triggerSavepoint(options, jobId);
    }

    private int triggerSavepoint(SavepointOptions options, JobID jobId) {
        try {
            Object result;
            ActorGateway jobManager = this.getJobManagerGateway(options);
            this.logAndSysout("Triggering savepoint for job " + jobId + ".");
            Future response = jobManager.ask((Object)new JobManagerMessages.TriggerSavepoint(jobId), new FiniteDuration(1L, TimeUnit.HOURS));
            try {
                this.logAndSysout("Waiting for response...");
                result = Await.result((Awaitable)response, (Duration)FiniteDuration.Inf());
            }
            catch (Exception e) {
                throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
            }
            if (result instanceof JobManagerMessages.TriggerSavepointSuccess) {
                JobManagerMessages.TriggerSavepointSuccess success = (JobManagerMessages.TriggerSavepointSuccess)result;
                this.logAndSysout("Savepoint completed. Path: " + success.savepointPath());
                this.logAndSysout("You can resume your program from this savepoint with the run command.");
                return 0;
            }
            if (result instanceof JobManagerMessages.TriggerSavepointFailure) {
                JobManagerMessages.TriggerSavepointFailure failure = (JobManagerMessages.TriggerSavepointFailure)result;
                throw failure.cause();
            }
            throw new IllegalStateException("Unknown JobManager response of type " + result.getClass());
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    private int disposeSavepoint(SavepointOptions options) {
        try {
            savepointPath = options.getSavepointPath();
            if (savepointPath == null) {
                throw new IllegalArgumentException("Missing required argument: savepoint path. Usage: bin/flink savepoint -d <savepoint-path>");
            }
            jarFile = options.getJarFilePath();
            jobManager = this.getJobManagerGateway(options);
            blobKeys = null;
            if (jarFile != null) {
                this.logAndSysout("Disposing savepoint '" + savepointPath + "' with JAR " + jarFile + ".");
                libs = null;
                try {
                    libs = PackagedProgram.extractContainedLibraries(new File(jarFile).toURI().toURL());
                    if (libs.isEmpty()) ** GOTO lbl29
                    libPaths = new ArrayList<Path>(libs.size());
                    for (File f : libs) {
                        libPaths.add(new Path(f.toURI()));
                    }
                    this.logAndSysout("Uploading JAR files.");
                    CliFrontend.LOG.debug("JAR files: " + libPaths);
                    blobKeys = BlobClient.uploadJarFiles((ActorGateway)jobManager, (FiniteDuration)this.clientTimeout, libPaths);
                    CliFrontend.LOG.debug("Blob keys: " + blobKeys.toString());
                }
                finally {
                    if (libs != null) {
                        PackagedProgram.deleteExtractedLibraries(libs);
                    }
                }
            } else {
                this.logAndSysout("Disposing savepoint '" + savepointPath + "'.");
            }
lbl29:
            // 3 sources

            msg = new JobManagerMessages.DisposeSavepoint(savepointPath, Option.apply(blobKeys));
            response = jobManager.ask((Object)msg, this.clientTimeout);
            try {
                this.logAndSysout("Waiting for response...");
                result = Await.result((Awaitable)response, (Duration)this.clientTimeout);
            }
            catch (Exception e) {
                throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
            }
            if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
                this.logAndSysout("Savepoint '" + savepointPath + "' disposed.");
                return 0;
            }
            if (result instanceof JobManagerMessages.DisposeSavepointFailure) {
                failure = (JobManagerMessages.DisposeSavepointFailure)result;
                if (failure.cause() instanceof ClassNotFoundException) {
                    throw new ClassNotFoundException("Savepoint disposal failed, because of a missing class. This is most likely caused by a custom state instance, which cannot be disposed without the user code class loader. Please provide the program jar with which you have created the savepoint via -j <JAR> for disposal.", failure.cause().getCause());
                }
                throw failure.cause();
            }
            throw new IllegalStateException("Unknown JobManager response of type " + result.getClass());
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
        JobSubmissionResult result;
        this.logAndSysout("Starting execution of program");
        try {
            result = client.run(program, parallelism);
        }
        catch (ProgramInvocationException e) {
            int n = this.handleError(e);
            return n;
        }
        finally {
            program.deleteExtractedLibraries();
        }
        if (result.isJobExecutionResult()) {
            this.logAndSysout("Program execution finished");
            JobExecutionResult execResult = result.getJobExecutionResult();
            System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
            System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
            Map accumulatorsResult = execResult.getAllAccumulatorResults();
            if (accumulatorsResult.size() > 0) {
                System.out.println("Accumulator Results: ");
                System.out.println(AccumulatorHelper.getResultsFormated((Map)accumulatorsResult));
            }
        } else {
            this.logAndSysout("Job has been submitted with JobID " + result.getJobID());
        }
        return 0;
    }

    protected PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
        String[] programArgs = options.getProgramArgs();
        String jarFilePath = options.getJarFilePath();
        List<URL> classpaths = options.getClasspaths();
        if (jarFilePath == null) {
            throw new IllegalArgumentException("The program JAR file was not specified.");
        }
        File jarFile = new File(jarFilePath);
        if (!jarFile.exists()) {
            throw new FileNotFoundException("JAR file does not exist: " + jarFile);
        }
        if (!jarFile.isFile()) {
            throw new FileNotFoundException("JAR file is not a file: " + jarFile);
        }
        String entryPointClass = options.getEntryPointClassName();
        PackagedProgram program = entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
        program.setSavepointPath(options.getSavepointPath());
        return program;
    }

    protected ClusterClient retrieveClient(CommandLineOptions options) {
        CustomCommandLine customCLI = this.getActiveCustomCommandLine(options.getCommandLine());
        try {
            Object client = customCLI.retrieveCluster(options.getCommandLine(), this.config);
            this.logAndSysout("Using address " + ((ClusterClient)client).getJobManagerAddress() + " to connect to JobManager.");
            return client;
        }
        catch (Exception e) {
            LOG.error("Couldn't retrieve {} cluster.", (Object)customCLI.getId(), (Object)e);
            throw new IllegalConfigurationException("Couldn't retrieve client for cluster", (Throwable)e);
        }
    }

    protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
        this.logAndSysout("Retrieving JobManager.");
        return this.retrieveClient(options).getJobManagerGateway();
    }

    protected ClusterClient createClient(CommandLineOptions options, String programName) throws Exception {
        Object client;
        CustomCommandLine activeCommandLine = this.getActiveCustomCommandLine(options.getCommandLine());
        try {
            client = activeCommandLine.retrieveCluster(options.getCommandLine(), this.config);
            this.logAndSysout("Cluster configuration: " + ((ClusterClient)client).getClusterIdentifier());
        }
        catch (UnsupportedOperationException e) {
            try {
                String applicationName = "Flink Application: " + programName;
                client = activeCommandLine.createCluster(applicationName, options.getCommandLine(), this.config);
                this.logAndSysout("Cluster started: " + ((ClusterClient)client).getClusterIdentifier());
            }
            catch (UnsupportedOperationException e2) {
                throw new IllegalConfigurationException("The JobManager address is neither provided at the command-line, nor configured in flink-conf.yaml.");
            }
        }
        InetSocketAddress jobManagerAddress = ((ClusterClient)client).getJobManagerAddress();
        this.logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
        this.logAndSysout("JobManager web interface address " + ((ClusterClient)client).getWebInterfaceURL());
        return client;
    }

    private int handleArgException(Exception e) {
        LOG.error("Invalid command line arguments. " + (e.getMessage() == null ? "" : e.getMessage()));
        System.out.println(e.getMessage());
        System.out.println();
        System.out.println("Use the help option (-h or --help) to get help on the command.");
        return 1;
    }

    private int handleError(Throwable t) {
        LOG.error("Error while running the command.", t);
        System.err.println();
        System.err.println("------------------------------------------------------------");
        System.err.println(" The program finished with the following exception:");
        System.err.println();
        if (t.getCause() instanceof InvalidProgramException) {
            StackTraceElement[] trace;
            System.err.println(t.getCause().getMessage());
            for (StackTraceElement ele : trace = t.getCause().getStackTrace()) {
                System.err.println("\t" + ele.toString());
                if (!ele.getMethodName().equals("main")) {
                    continue;
                }
                break;
            }
        } else {
            t.printStackTrace();
        }
        return 1;
    }

    private void logAndSysout(String message) {
        LOG.info(message);
        System.out.println(message);
    }

    public int parseParameters(String[] args) {
        if (args.length < 1) {
            CliFrontendParser.printHelp();
            System.out.println("Please specify an action.");
            return 1;
        }
        String action = args[0];
        final String[] params = Arrays.copyOfRange(args, 1, args.length);
        switch (action) {
            case "run": {
                if (SecurityUtils.isSecurityEnabled()) {
                    String message = "Secure Hadoop environment setup detected. Running in secure context.";
                    LOG.info(message);
                    try {
                        return (Integer)SecurityUtils.runSecured((SecurityUtils.FlinkSecuredRunner)new SecurityUtils.FlinkSecuredRunner<Integer>(){

                            public Integer run() throws Exception {
                                return CliFrontend.this.run(params);
                            }
                        });
                    }
                    catch (Exception e) {
                        return this.handleError(e);
                    }
                }
                return this.run(params);
            }
            case "list": {
                return this.list(params);
            }
            case "info": {
                return this.info(params);
            }
            case "cancel": {
                return this.cancel(params);
            }
            case "stop": {
                return this.stop(params);
            }
            case "savepoint": {
                return this.savepoint(params);
            }
            case "-h": 
            case "--help": {
                CliFrontendParser.printHelp();
                return 0;
            }
            case "-v": 
            case "--version": {
                String version = EnvironmentInformation.getVersion();
                String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                System.out.print("Version: " + version);
                System.out.println(!commitID.equals("<unknown>") ? ", Commit ID: " + commitID : "");
                return 0;
            }
        }
        System.out.printf("\"%s\" is not a valid action.\n", action);
        System.out.println();
        System.out.println("Valid actions are \"run\", \"list\", \"info\", \"stop\", or \"cancel\".");
        System.out.println();
        System.out.println("Specify the version option (-v or --version) to print Flink version.");
        System.out.println();
        System.out.println("Specify the help option (-h or --help) to get help on the command.");
        return 1;
    }

    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo((Logger)LOG, (String)"Command Line Client", (String[])args);
        try {
            CliFrontend cli = new CliFrontend();
            int retCode = cli.parseParameters(args);
            System.exit(retCode);
        }
        catch (Throwable t) {
            LOG.error("Fatal error while running command line interface.", t);
            t.printStackTrace();
            System.exit(31);
        }
    }

    public static String getConfigurationDirectoryFromEnv() {
        String location;
        String envLocation = System.getenv("FLINK_CONF_DIR");
        String string = location = envLocation != null ? envLocation : System.getProperty("FLINK_CONF_DIR");
        if (location != null) {
            if (new File(location).exists()) {
                return location;
            }
            throw new RuntimeException("The config directory '" + location + "', specified in the '" + "FLINK_CONF_DIR" + "' environment variable, does not exist.");
        }
        if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_1;
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_2;
        } else {
            throw new RuntimeException("The configuration directory was not specified. Please specify the directory containing the configuration file through the 'FLINK_CONF_DIR' environment variable.");
        }
        return location;
    }

    public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
        config.setString("jobmanager.rpc.address", address.getHostString());
        config.setInteger("jobmanager.rpc.port", address.getPort());
    }

    public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
        for (CustomCommandLine cli : customCommandLine) {
            if (!cli.isActive(commandLine, this.config)) continue;
            return cli;
        }
        throw new IllegalStateException("No command-line ran.");
    }

    public static List<CustomCommandLine> getCustomCommandLineList() {
        return Collections.unmodifiableList(customCommandLine);
    }

    private static void loadCustomCommandLine(String className, Object ... params) {
        try {
            Class<CustomCommandLine> customCliClass = Class.forName(className).asSubclass(CustomCommandLine.class);
            Class[] types = new Class[params.length];
            for (int i = 0; i < params.length; ++i) {
                Preconditions.checkNotNull((Object)params[i], (String)"Parameters for custom command-lines may not be null.");
                types[i] = params[i].getClass();
            }
            Constructor<CustomCommandLine> constructor = customCliClass.getConstructor(types);
            CustomCommandLine cli = constructor.newInstance(params);
            customCommandLine.add(cli);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            LOG.warn("Unable to locate custom CLI class {}. Flink is not compiled with support for this class.", (Object)className, (Object)e);
        }
    }

    static {
        CliFrontend.loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
        customCommandLine.add(new DefaultCLI());
    }
}

