/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.InfoOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class CliFrontend {
    public static final String ACTION_RUN = "run";
    public static final String ACTION_INFO = "info";
    private static final String ACTION_LIST = "list";
    private static final String ACTION_CANCEL = "cancel";
    private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
    private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
    private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
    public static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
    public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
    public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
    public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
    public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
    public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
    private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
    private final File configDirectory;
    private final Configuration config;
    private final FiniteDuration askTimeout;
    private final FiniteDuration lookupTimeout;
    private ActorSystem actorSystem;
    private AbstractFlinkYarnCluster yarnCluster;
    static boolean webFrontend = false;
    private FlinkPlan optimizedPlan;
    private PackagedProgram packagedProgram;

    public CliFrontend() throws Exception {
        this(CliFrontend.getConfigurationDirectoryFromEnv());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CliFrontend(String configDir) throws Exception {
        this.configDirectory = new File(configDir);
        LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath());
        LOG.info("Trying to load configuration file");
        GlobalConfiguration.loadConfiguration((String)this.configDirectory.getAbsolutePath());
        this.config = GlobalConfiguration.getConfiguration();
        String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
        String currentUser = System.getProperty("user.name");
        String propertiesFileLocation = this.config.getString("yarn.properties-file.location", defaultPropertiesFileLocation);
        File propertiesFile = new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
        if (propertiesFile.exists()) {
            String address;
            this.logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
            Properties yarnProperties = new Properties();
            try (FileInputStream is = new FileInputStream(propertiesFile);){
                yarnProperties.load(is);
            }
            catch (IOException e) {
                throw new Exception("Cannot read the YARN properties file", e);
            }
            String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
            if (propParallelism != null) {
                try {
                    int parallelism = Integer.parseInt(propParallelism);
                    this.config.setInteger("parallelism.default", parallelism);
                    this.logAndSysout("YARN properties set default parallelism to " + parallelism);
                }
                catch (NumberFormatException e) {
                    throw new Exception("Error while parsing the YARN properties: Property parallelism is not an integer.");
                }
            }
            if ((address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY)) != null) {
                InetSocketAddress jobManagerAddress;
                try {
                    jobManagerAddress = CliFrontend.parseHostPortAddress(address);
                    this.writeJobManagerAddressToConfig(jobManagerAddress);
                }
                catch (Exception e) {
                    throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
                }
                this.logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
            }
            String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
            List<Tuple2<String, String>> dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
            for (Tuple2<String, String> dynamicProperty : dynamicProperties) {
                this.config.setString((String)dynamicProperty.f0, (String)dynamicProperty.f1);
            }
        }
        this.askTimeout = AkkaUtils.getTimeout((Configuration)this.config);
        this.lookupTimeout = AkkaUtils.getLookupTimeout((Configuration)this.config);
    }

    public Configuration getConfiguration() {
        Configuration copiedConfiguration = new Configuration();
        copiedConfiguration.addAll(this.config);
        return copiedConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    protected int run(String[] args) {
        PackagedProgram program;
        RunOptions options;
        LOG.info("Running 'run' command.");
        try {
            options = CliFrontendParser.parseRunCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForRun();
            return 0;
        }
        if (options.getJarFilePath() == null) {
            return this.handleArgException(new CliArgsException("The program JAR file was not specified."));
        }
        try {
            LOG.info("Building program from JAR file");
            program = this.buildProgram(options);
        }
        catch (FileNotFoundException e) {
            return this.handleArgException(e);
        }
        catch (ProgramInvocationException e) {
            return this.handleError(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        int exitCode = 1;
        int userParallelism = options.getParallelism();
        LOG.debug("User parallelism is set to {}", (Object)userParallelism);
        Client client = this.getClient(options, program.getMainClassName(), userParallelism);
        client.setPrintStatusDuringExecution(options.getStdoutLogging());
        LOG.debug("Client slots is set to {}", (Object)client.getMaxSlots());
        if (client.getMaxSlots() != -1 && userParallelism == -1) {
            this.logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots() + "). " + "To use another parallelism, set it at the ./bin/flink client.");
            userParallelism = client.getMaxSlots();
        }
        if (this.yarnCluster != null && this.yarnCluster.isDetached()) {
            this.logAndSysout("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + this.yarnCluster.getApplicationId() + "\n" + "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
            exitCode = this.executeProgramDetached(program, client, userParallelism);
        } else {
            exitCode = this.executeProgramBlocking(program, client, userParallelism);
        }
        if (this.yarnCluster != null && !this.yarnCluster.isDetached()) {
            List msgs = this.yarnCluster.getNewMessages();
            if (msgs != null && msgs.size() > 1) {
                this.logAndSysout("The following messages were created by the YARN cluster while running the Job:");
                for (String msg : msgs) {
                    this.logAndSysout(msg);
                }
            }
            if (this.yarnCluster.hasFailed()) {
                this.logAndSysout("YARN cluster is in failed state!");
                this.logAndSysout("YARN Diagnostics: " + this.yarnCluster.getDiagnostics());
            }
        }
        int n = exitCode;
        client.shutdown();
        if (this.yarnCluster != null && !this.yarnCluster.isDetached()) {
            this.logAndSysout("Shutting down YARN cluster");
            this.yarnCluster.shutdown(exitCode != 0);
        }
        if (program != null) {
            program.deleteExtractedLibraries();
        }
        return n;
        {
            catch (Throwable throwable) {
                try {
                    try {
                        client.shutdown();
                        throw throwable;
                    }
                    catch (Throwable t) {
                        int n2 = this.handleError(t);
                        if (this.yarnCluster != null && !this.yarnCluster.isDetached()) {
                            this.logAndSysout("Shutting down YARN cluster");
                            this.yarnCluster.shutdown(exitCode != 0);
                        }
                        if (program != null) {
                            program.deleteExtractedLibraries();
                        }
                        return n2;
                    }
                }
                catch (Throwable throwable2) {
                    if (this.yarnCluster != null && !this.yarnCluster.isDetached()) {
                        this.logAndSysout("Shutting down YARN cluster");
                        this.yarnCluster.shutdown(exitCode != 0);
                    }
                    if (program != null) {
                        program.deleteExtractedLibraries();
                    }
                    throw throwable2;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int info(String[] args) {
        PackagedProgram program;
        InfoOptions options;
        LOG.info("Running 'info' command.");
        try {
            options = CliFrontendParser.parseInfoCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForInfo();
            return 0;
        }
        if (options.getJarFilePath() == null) {
            return this.handleArgException(new CliArgsException("The program JAR file was not specified."));
        }
        try {
            LOG.info("Building program from JAR file");
            program = this.buildProgram(options);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        try {
            int parallelism = options.getParallelism();
            LOG.info("Creating program plan dump");
            Optimizer compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), this.config);
            FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
            if (webFrontend) {
                this.optimizedPlan = flinkPlan;
                this.packagedProgram = program;
            } else {
                String jsonPlan = null;
                if (flinkPlan instanceof OptimizedPlan) {
                    jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan)flinkPlan);
                } else if (flinkPlan instanceof StreamingPlan) {
                    jsonPlan = ((StreamingPlan)flinkPlan).getStreamingPlanAsJSON();
                }
                if (jsonPlan != null) {
                    System.out.println("----------------------- Execution Plan -----------------------");
                    System.out.println(jsonPlan);
                    System.out.println("--------------------------------------------------------------");
                } else {
                    System.out.println("JSON plan could not be generated.");
                }
                String description = program.getDescription();
                if (description != null) {
                    System.out.println();
                    System.out.println(description);
                } else {
                    System.out.println();
                    System.out.println("No description provided.");
                }
            }
            int n = 0;
            return n;
        }
        catch (Throwable t) {
            int n = this.handleError(t);
            return n;
        }
        finally {
            program.deleteExtractedLibraries();
        }
    }

    protected int list(String[] args) {
        ListOptions options;
        LOG.info("Running 'list' command.");
        try {
            options = CliFrontendParser.parseListCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForList();
            return 0;
        }
        boolean running = options.getRunning();
        boolean scheduled = options.getScheduled();
        if (!running && !scheduled) {
            running = true;
            scheduled = true;
        }
        try {
            Object result;
            ActorGateway jobManagerGateway = this.getJobManagerGateway(options);
            LOG.info("Connecting to JobManager to retrieve list of jobs");
            Future response = jobManagerGateway.ask(JobManagerMessages.getRequestRunningJobsStatus(), this.askTimeout);
            try {
                result = Await.result((Awaitable)response, (Duration)this.askTimeout);
            }
            catch (Exception e) {
                throw new Exception("Could not retrieve running jobs from the JobManager.", e);
            }
            if (result instanceof JobManagerMessages.RunningJobsStatus) {
                LOG.info("Successfully retrieved list of jobs");
                List jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages();
                ArrayList<JobStatusMessage> runningJobs = null;
                ArrayList<JobStatusMessage> scheduledJobs = null;
                if (running) {
                    runningJobs = new ArrayList<JobStatusMessage>();
                }
                if (scheduled) {
                    scheduledJobs = new ArrayList<JobStatusMessage>();
                }
                for (JobStatusMessage rj : jobs) {
                    if (running && rj.getJobState().equals((Object)JobStatus.RUNNING)) {
                        runningJobs.add(rj);
                    }
                    if (!scheduled || !rj.getJobState().equals((Object)JobStatus.CREATED)) continue;
                    scheduledJobs.add(rj);
                }
                SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
                Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){

                    @Override
                    public int compare(JobStatusMessage o1, JobStatusMessage o2) {
                        return (int)(o1.getStartTime() - o2.getStartTime());
                    }
                };
                if (running) {
                    if (runningJobs.size() == 0) {
                        System.out.println("No running jobs.");
                    } else {
                        Collections.sort(runningJobs, njec);
                        System.out.println("------------------------ Running Jobs ------------------------");
                        for (JobStatusMessage rj : runningJobs) {
                            System.out.println(df.format(new Date(rj.getStartTime())) + " : " + rj.getJobId() + " : " + rj.getJobName());
                        }
                        System.out.println("--------------------------------------------------------------");
                    }
                }
                if (scheduled) {
                    if (scheduledJobs.size() == 0) {
                        System.out.println("No scheduled jobs.");
                    } else {
                        Collections.sort(scheduledJobs, njec);
                        System.out.println("----------------------- Scheduled Jobs -----------------------");
                        for (JobStatusMessage rj : scheduledJobs) {
                            System.out.println(df.format(new Date(rj.getStartTime())) + " : " + rj.getJobId() + " : " + rj.getJobName());
                        }
                        System.out.println("--------------------------------------------------------------");
                    }
                }
                return 0;
            }
            throw new Exception("ReqeustRunningJobs requires a response of type RunningJobs. Instead the response is of type " + result.getClass() + ".");
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    protected int cancel(String[] args) {
        JobID jobId;
        CancelOptions options;
        LOG.info("Running 'cancel' command.");
        try {
            options = CliFrontendParser.parseCancelCommand(args);
        }
        catch (CliArgsException e) {
            return this.handleArgException(e);
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
        if (options.isPrintHelp()) {
            CliFrontendParser.printHelpForCancel();
            return 0;
        }
        String[] cleanedArgs = options.getArgs();
        if (cleanedArgs.length > 0) {
            String jobIdString = cleanedArgs[0];
            try {
                jobId = new JobID(StringUtils.hexStringToByte((String)jobIdString));
            }
            catch (Exception e) {
                LOG.error("Error: The value for the Job ID is not a valid ID.");
                System.out.println("Error: The value for the Job ID is not a valid ID.");
                return 1;
            }
        } else {
            LOG.error("Missing JobID in the command line arguments.");
            System.out.println("Error: Specify a Job ID to cancel a job.");
            return 1;
        }
        try {
            ActorGateway jobManager = this.getJobManagerGateway(options);
            Future response = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobId), this.askTimeout);
            try {
                Await.result((Awaitable)response, (Duration)this.askTimeout);
                return 0;
            }
            catch (Exception e) {
                throw new Exception("Canceling the job with ID " + jobId + " failed.", e);
            }
        }
        catch (Throwable t) {
            return this.handleError(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
        JobSubmissionResult result;
        LOG.info("Starting execution of program");
        try {
            result = client.runDetached(program, parallelism);
        }
        catch (ProgramInvocationException e) {
            int n = this.handleError(e);
            return n;
        }
        finally {
            program.deleteExtractedLibraries();
        }
        if (this.yarnCluster != null && this.yarnCluster.isDetached()) {
            this.yarnCluster.stopAfterJob(result.getJobID());
            this.yarnCluster.disconnect();
        }
        if (!webFrontend) {
            System.out.println("Job has been submitted with JobID " + result.getJobID());
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
        JobSubmissionResult result;
        LOG.info("Starting execution of program");
        try {
            result = client.runBlocking(program, parallelism);
        }
        catch (ProgramInvocationException e) {
            int n = this.handleError(e);
            return n;
        }
        finally {
            program.deleteExtractedLibraries();
        }
        LOG.info("Program execution finished");
        if (result instanceof JobExecutionResult && !webFrontend) {
            JobExecutionResult execResult = (JobExecutionResult)result;
            System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
            System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
            Map accumulatorsResult = execResult.getAllAccumulatorResults();
            if (accumulatorsResult.size() > 0) {
                System.out.println("Accumulator Results: ");
                System.out.println(AccumulatorHelper.getResultsFormated((Map)accumulatorsResult));
            }
        }
        return 0;
    }

    protected PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
        String[] programArgs = options.getProgramArgs();
        String jarFilePath = options.getJarFilePath();
        List<URL> classpaths = options.getClasspaths();
        if (jarFilePath == null) {
            throw new IllegalArgumentException("The program JAR file was not specified.");
        }
        File jarFile = new File(jarFilePath);
        if (!jarFile.exists()) {
            throw new FileNotFoundException("JAR file does not exist: " + jarFile);
        }
        if (!jarFile.isFile()) {
            throw new FileNotFoundException("JAR file is not a file: " + jarFile);
        }
        String entryPointClass = options.getEntryPointClassName();
        return entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
    }

    protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
        this.config.setString("jobmanager.rpc.address", address.getHostName());
        this.config.setInteger("jobmanager.rpc.port", address.getPort());
    }

    protected void updateConfig(CommandLineOptions options) {
        if (options.getJobManagerAddress() != null) {
            InetSocketAddress jobManagerAddress = CliFrontend.parseHostPortAddress(options.getJobManagerAddress());
            this.writeJobManagerAddressToConfig(jobManagerAddress);
        }
    }

    protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
        this.updateConfig(options);
        if (this.actorSystem == null) {
            LOG.info("Starting actor system to communicate with JobManager");
            try {
                scala.Tuple2 systemEndpoint = new scala.Tuple2((Object)"", (Object)0);
                this.actorSystem = AkkaUtils.createActorSystem((Configuration)this.config, (Option)new Some((Object)systemEndpoint));
            }
            catch (Exception e) {
                throw new IOException("Could not start actor system to communicate with JobManager", e);
            }
            LOG.info("Actor system successfully started");
        }
        LOG.info("Trying to lookup the JobManager gateway");
        LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService((Configuration)this.config);
        return LeaderRetrievalUtils.retrieveLeaderGateway((LeaderRetrievalService)lrs, (ActorSystem)this.actorSystem, (FiniteDuration)this.lookupTimeout);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
        int maxSlots = -1;
        if (!YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
            if (options.getJobManagerAddress() == null) return new Client(this.config, maxSlots);
            InetSocketAddress jobManagerAddress = CliFrontend.parseHostPortAddress(options.getJobManagerAddress());
            this.writeJobManagerAddressToConfig(jobManagerAddress);
            return new Client(this.config, maxSlots);
        }
        this.logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
        CommandLine commandLine = options.getCommandLine();
        AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
        if (flinkYarnClient == null) {
            throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
        }
        flinkYarnClient.setName("Flink Application: " + programName);
        int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
        if (yarnTmSlots == -1) {
            yarnTmSlots = 1;
        }
        maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
        if (userParallelism != -1) {
            int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
            this.logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + "but the user requested a parallelism of " + userParallelism + " on YARN. " + "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + "will get " + slotsPerTM + " slots.");
            flinkYarnClient.setTaskManagerSlots(slotsPerTM);
        }
        try {
            this.yarnCluster = flinkYarnClient.deploy();
            this.yarnCluster.connectToCluster();
        }
        catch (Exception e) {
            throw new RuntimeException("Error deploying the YARN cluster", e);
        }
        InetSocketAddress jobManagerAddress = this.yarnCluster.getJobManagerAddress();
        this.writeJobManagerAddressToConfig(jobManagerAddress);
        this.logAndSysout("YARN cluster started");
        this.logAndSysout("JobManager web interface address " + this.yarnCluster.getWebInterfaceURL());
        this.logAndSysout("Waiting until all TaskManagers have connected");
        while (true) {
            FlinkYarnClusterStatus status;
            if ((status = this.yarnCluster.getClusterStatus()) != null) {
                if (status.getNumberOfTaskManagers() >= flinkYarnClient.getTaskManagerCount()) {
                    this.logAndSysout("All TaskManagers are connected");
                    return new Client(this.config, maxSlots);
                }
                this.logAndSysout("TaskManager status (" + status.getNumberOfTaskManagers() + "/" + flinkYarnClient.getTaskManagerCount() + ")");
            } else {
                this.logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
            }
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for TaskManagers");
                System.err.println("Thread is interrupted");
                Thread.currentThread().interrupt();
                continue;
            }
            break;
        }
    }

    private int handleArgException(Exception e) {
        if (webFrontend) {
            throw new RuntimeException(e);
        }
        LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage()));
        System.out.println(e.getMessage());
        System.out.println();
        System.out.println("Use the help option (-h or --help) to get help on the command.");
        return 1;
    }

    private int handleError(Throwable t) {
        if (webFrontend) {
            throw new RuntimeException(t);
        }
        LOG.error("Error while running the command.", t);
        t.printStackTrace();
        System.err.println();
        System.err.println("The exception above occurred while trying to run your command.");
        return 1;
    }

    private void logAndSysout(String message) {
        LOG.info(message);
        if (!webFrontend) {
            System.out.println(message);
        }
    }

    public int parseParameters(String[] args) {
        if (args.length < 1) {
            CliFrontendParser.printHelp();
            System.out.println("Please specify an action.");
            return 1;
        }
        String action = args[0];
        final String[] params = Arrays.copyOfRange(args, 1, args.length);
        if (action.equals(ACTION_RUN)) {
            if (SecurityUtils.isSecurityEnabled()) {
                String message = "Secure Hadoop environment setup detected. Running in secure context.";
                LOG.info(message);
                if (!webFrontend) {
                    System.out.println(message);
                }
                try {
                    return (Integer)SecurityUtils.runSecured((SecurityUtils.FlinkSecuredRunner)new SecurityUtils.FlinkSecuredRunner<Integer>(){

                        public Integer run() throws Exception {
                            return CliFrontend.this.run(params);
                        }
                    });
                }
                catch (Exception e) {
                    this.handleError(e);
                }
            }
            return this.run(params);
        }
        if (action.equals(ACTION_LIST)) {
            return this.list(params);
        }
        if (action.equals(ACTION_INFO)) {
            return this.info(params);
        }
        if (action.equals(ACTION_CANCEL)) {
            return this.cancel(params);
        }
        if (action.equals("-h") || action.equals("--help")) {
            CliFrontendParser.printHelp();
            return 0;
        }
        System.out.printf("\"%s\" is not a valid action.\n", action);
        System.out.println();
        System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\".");
        System.out.println();
        System.out.println("Specify the help option (-h or --help) to get help on the command.");
        return 1;
    }

    public FlinkPlan getFlinkPlan() {
        return this.optimizedPlan;
    }

    public PackagedProgram getPackagedProgram() {
        return this.packagedProgram;
    }

    public void shutdown() {
        ActorSystem sys = this.actorSystem;
        if (sys != null) {
            this.actorSystem = null;
            sys.shutdown();
        }
    }

    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo((Logger)LOG, (String)"Command Line Client", (String[])args);
        EnvironmentInformation.checkJavaVersion();
        try {
            CliFrontend cli = new CliFrontend();
            int retCode = cli.parseParameters(args);
            System.exit(retCode);
        }
        catch (Throwable t) {
            LOG.error("Fatal error while running command line interface.", t);
            t.printStackTrace();
            System.exit(31);
        }
    }

    private static InetSocketAddress parseHostPortAddress(String hostAndPort) {
        URI uri;
        try {
            uri = new URI("my://" + hostAndPort);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("Malformed address " + hostAndPort, e);
        }
        String host = uri.getHost();
        int port = uri.getPort();
        if (host == null || port == -1) {
            throw new RuntimeException("Address is missing hostname or port " + hostAndPort);
        }
        return new InetSocketAddress(host, port);
    }

    public static String getConfigurationDirectoryFromEnv() {
        String location = System.getenv(ENV_CONFIG_DIRECTORY);
        if (location != null) {
            if (new File(location).exists()) {
                return location;
            }
            throw new RuntimeException("The config directory '" + location + "', specified in the '" + ENV_CONFIG_DIRECTORY + "' environment variable, does not exist.");
        }
        if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_1;
        } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
            location = CONFIG_DIRECTORY_FALLBACK_2;
        } else {
            throw new RuntimeException("The configuration directory was not specified. Please specify the directory containing the configuration file through the 'FLINK_CONF_DIR' environment variable.");
        }
        return location;
    }

    public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
        ArrayList<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
        if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
            String[] propertyLines;
            for (String propLine : propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR)) {
                String[] kv;
                if (propLine == null || (kv = propLine.split("=")).length < 2 || kv[0] == null || kv[1] == null || kv[0].length() <= 0) continue;
                ret.add((Tuple2<String, String>)new Tuple2((Object)kv[0], (Object)kv[1]));
            }
        }
        return ret;
    }
}

