/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkYarnSessionCli {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
    private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
    public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
    public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
    private static final int CLIENT_POLLING_INTERVALL = 3;
    private final Option QUERY;
    private final Option QUEUE;
    private final Option SHIP_PATH;
    private final Option FLINK_JAR;
    private final Option JM_MEMORY;
    private final Option TM_MEMORY;
    private final Option CONTAINER;
    private final Option SLOTS;
    private final Option DETACHED;
    private final Option STREAMING;
    private final Option NAME;
    private final Option DYNAMIC_PROPERTIES;
    private AbstractFlinkYarnCluster yarnCluster = null;
    private boolean detachedMode = false;

    public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
        this.QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
        this.QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
        this.SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
        this.FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
        this.JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
        this.TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
        this.CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
        this.SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
        this.DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
        this.DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
        this.STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
        this.NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
    }

    public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
        Path localJarPath;
        AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
        if (flinkYarnClient == null) {
            return null;
        }
        if (!cmd.hasOption(this.CONTAINER.getOpt())) {
            LOG.error("Missing required argument " + this.CONTAINER.getOpt());
            this.printUsage();
            return null;
        }
        flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(this.CONTAINER.getOpt())).intValue());
        if (cmd.hasOption(this.FLINK_JAR.getOpt())) {
            String userPath = cmd.getOptionValue(this.FLINK_JAR.getOpt());
            if (!userPath.startsWith("file://")) {
                userPath = "file://" + userPath;
            }
            localJarPath = new Path(userPath);
        } else {
            LOG.info("No path for the flink jar passed. Using the location of " + flinkYarnClient.getClass() + " to locate the jar");
            localJarPath = new Path("file://" + flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
        }
        flinkYarnClient.setLocalJarPath(localJarPath);
        String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
        GlobalConfiguration.loadConfiguration((String)confDirPath);
        Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
        flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
        flinkYarnClient.setConfigurationDirectory(confDirPath);
        File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
        if (!confFile.exists()) {
            LOG.error("Unable to locate configuration file in " + confFile);
            return null;
        }
        Path confPath = new Path(confFile.getAbsolutePath());
        flinkYarnClient.setConfigurationFilePath(confPath);
        ArrayList<Object> shipFiles = new ArrayList<File>();
        if (cmd.hasOption(this.SHIP_PATH.getOpt())) {
            String shipPath = cmd.getOptionValue(this.SHIP_PATH.getOpt());
            File shipDir = new File(shipPath);
            if (shipDir.isDirectory()) {
                shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter(){

                    @Override
                    public boolean accept(File dir, String name) {
                        return !name.equals(".") && !name.equals("..");
                    }
                })));
            } else {
                LOG.warn("Ship directory is not a directory. Ignoring it.");
            }
        }
        if (confDirPath.length() > 0) {
            File log4j;
            File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
            if (logback.exists()) {
                shipFiles.add(logback);
                flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
            }
            if ((log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME)).exists()) {
                shipFiles.add(log4j);
                if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
                    LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " + "Logback configuration files. Please delete or rename one of them.");
                }
                flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
            }
        }
        flinkYarnClient.setShipFiles(shipFiles);
        if (cmd.hasOption(this.QUEUE.getOpt())) {
            flinkYarnClient.setQueue(cmd.getOptionValue(this.QUEUE.getOpt()));
        }
        if (cmd.hasOption(this.JM_MEMORY.getOpt())) {
            int jmMemory = Integer.valueOf(cmd.getOptionValue(this.JM_MEMORY.getOpt()));
            flinkYarnClient.setJobManagerMemory(jmMemory);
        }
        if (cmd.hasOption(this.TM_MEMORY.getOpt())) {
            int tmMemory = Integer.valueOf(cmd.getOptionValue(this.TM_MEMORY.getOpt()));
            flinkYarnClient.setTaskManagerMemory(tmMemory);
        }
        if (cmd.hasOption(this.SLOTS.getOpt())) {
            int slots = Integer.valueOf(cmd.getOptionValue(this.SLOTS.getOpt()));
            flinkYarnClient.setTaskManagerSlots(slots);
        }
        String[] dynamicProperties = null;
        if (cmd.hasOption(this.DYNAMIC_PROPERTIES.getOpt())) {
            dynamicProperties = cmd.getOptionValues(this.DYNAMIC_PROPERTIES.getOpt());
        }
        String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, (String)"@@");
        flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
        if (cmd.hasOption(this.DETACHED.getOpt())) {
            this.detachedMode = true;
            flinkYarnClient.setDetachedMode(this.detachedMode);
        }
        if (cmd.hasOption(this.STREAMING.getOpt())) {
            flinkYarnClient.setStreamingMode(true);
        }
        if (cmd.hasOption(this.NAME.getOpt())) {
            flinkYarnClient.setName(cmd.getOptionValue(this.NAME.getOpt()));
        }
        return flinkYarnClient;
    }

    private void printUsage() {
        System.out.println("Usage:");
        HelpFormatter formatter = new HelpFormatter();
        formatter.setWidth(200);
        formatter.setLeftPadding(5);
        formatter.setSyntaxPrefix("   Required");
        Options req = new Options();
        req.addOption(this.CONTAINER);
        formatter.printHelp(" ", req);
        formatter.setSyntaxPrefix("   Optional");
        Options opt = new Options();
        opt.addOption(this.JM_MEMORY);
        opt.addOption(this.TM_MEMORY);
        opt.addOption(this.QUERY);
        opt.addOption(this.QUEUE);
        opt.addOption(this.SLOTS);
        opt.addOption(this.DYNAMIC_PROPERTIES);
        opt.addOption(this.DETACHED);
        opt.addOption(this.STREAMING);
        opt.addOption(this.NAME);
        formatter.printHelp(" ", opt);
    }

    public static AbstractFlinkYarnClient getFlinkYarnClient() {
        AbstractFlinkYarnClient yarnClient;
        try {
            Class<AbstractFlinkYarnClient> yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
            yarnClient = (AbstractFlinkYarnClient)InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
        }
        catch (ClassNotFoundException e) {
            System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " + e.getMessage());
            e.printStackTrace(System.err);
            return null;
        }
        return yarnClient;
    }

    private static void writeYarnProperties(Properties properties, File propertiesFile) {
        try {
            FileOutputStream out = new FileOutputStream(propertiesFile);
            properties.store(out, "Generated YARN properties file");
            ((OutputStream)out).close();
        }
        catch (IOException e) {
            throw new RuntimeException("Error writing the properties file", e);
        }
        propertiesFile.setReadable(true, false);
    }

    public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
        block17: {
            String HELP = "Available commands:\nhelp - show these commands\nstop - stop the YARN session";
            int numTaskmanagers = 0;
            try {
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                do {
                    String command;
                    List messages;
                    FlinkYarnClusterStatus status;
                    if ((status = yarnCluster.getClusterStatus()) != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
                        System.err.println("Number of connected TaskManagers changed to " + status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots());
                        numTaskmanagers = status.getNumberOfTaskManagers();
                    }
                    if ((messages = yarnCluster.getNewMessages()) != null && messages.size() > 0) {
                        System.err.println("New messages from the YARN cluster: ");
                        for (String msg : messages) {
                            System.err.println(msg);
                        }
                    }
                    if (yarnCluster.hasFailed()) {
                        System.err.println("The YARN cluster has failed");
                        yarnCluster.shutdown(true);
                    }
                    long startTime = System.currentTimeMillis();
                    while (System.currentTimeMillis() - startTime < 3000L && !in.ready()) {
                        Thread.sleep(200L);
                    }
                    if (!in.ready()) continue;
                    switch (command = in.readLine()) {
                        case "quit": 
                        case "stop": {
                            break block17;
                        }
                        case "help": {
                            System.err.println("Available commands:\nhelp - show these commands\nstop - stop the YARN session");
                            break;
                        }
                        default: {
                            System.err.println("Unknown command '" + command + "'. Showing help: \n" + "Available commands:\nhelp - show these commands\nstop - stop the YARN session");
                        }
                    }
                } while (!yarnCluster.hasBeenStopped());
                LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
            }
            catch (Exception e) {
                LOG.warn("Exception while running the interactive command line interface", (Throwable)e);
            }
        }
    }

    public static void main(String[] args) {
        FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
        System.exit(cli.run(args));
    }

    public void getYARNSessionCLIOptions(Options options) {
        options.addOption(this.FLINK_JAR);
        options.addOption(this.JM_MEMORY);
        options.addOption(this.TM_MEMORY);
        options.addOption(this.CONTAINER);
        options.addOption(this.QUEUE);
        options.addOption(this.QUERY);
        options.addOption(this.SHIP_PATH);
        options.addOption(this.SLOTS);
        options.addOption(this.DYNAMIC_PROPERTIES);
        options.addOption(this.DETACHED);
        options.addOption(this.STREAMING);
        options.addOption(this.NAME);
    }

    public int run(String[] args) {
        CommandLine cmd;
        Options options = new Options();
        this.getYARNSessionCLIOptions(options);
        PosixParser parser = new PosixParser();
        try {
            cmd = parser.parse(options, args);
        }
        catch (Exception e) {
            System.out.println(e.getMessage());
            this.printUsage();
            return 1;
        }
        if (cmd.hasOption(this.QUERY.getOpt())) {
            String description;
            AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient();
            try {
                description = flinkYarnClient.getClusterDescription();
            }
            catch (Exception e) {
                System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage());
                e.printStackTrace(System.err);
                return 1;
            }
            System.out.println(description);
            return 0;
        }
        AbstractFlinkYarnClient flinkYarnClient = this.createFlinkYarnClient(cmd);
        if (flinkYarnClient == null) {
            System.err.println("Error while starting the YARN Client. Please check log output!");
            return 1;
        }
        try {
            this.yarnCluster = flinkYarnClient.deploy();
            if (!flinkYarnClient.isDetached()) {
                this.yarnCluster.connectToCluster();
            }
        }
        catch (Exception e) {
            System.err.println("Error while deploying YARN cluster: " + e.getMessage());
            e.printStackTrace(System.err);
            return 1;
        }
        String jobManagerAddress = this.yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + this.yarnCluster.getJobManagerAddress().getPort();
        System.out.println("Flink JobManager is now running on " + jobManagerAddress);
        System.out.println("JobManager Web Interface: " + this.yarnCluster.getWebInterfaceURL());
        String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
        String currentUser = System.getProperty("user.name");
        String propertiesFileLocation = this.yarnCluster.getFlinkConfiguration().getString("yarn.properties-file.location", defaultPropertiesFileLocation);
        File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + ".yarn-properties-" + currentUser);
        Properties yarnProps = new Properties();
        yarnProps.setProperty("jobManager", jobManagerAddress);
        if (flinkYarnClient.getTaskManagerSlots() != -1) {
            String parallelism = Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
            yarnProps.setProperty("parallelism", parallelism);
        }
        if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
            yarnProps.setProperty("dynamicPropertiesString", flinkYarnClient.getDynamicPropertiesEncoded());
        }
        FlinkYarnSessionCli.writeYarnProperties(yarnProps, yarnPropertiesFile);
        if (this.detachedMode) {
            LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + this.yarnCluster.getApplicationId() + "\n" + "Please also note that the temporary files of the YARN session in {} will not be removed.", (Object)flinkYarnClient.getSessionFilesDir());
        } else {
            FlinkYarnSessionCli.runInteractiveCli(this.yarnCluster);
            if (!this.yarnCluster.hasBeenStopped()) {
                LOG.info("Command Line Interface requested session shutdown");
                this.yarnCluster.shutdown(false);
            }
            try {
                yarnPropertiesFile.delete();
            }
            catch (Exception e) {
                LOG.warn("Exception while deleting the JobManager address file", (Throwable)e);
            }
        }
        return 0;
    }

    public void stop() {
        if (this.yarnCluster != null) {
            LOG.info("Command line interface is shutting down the yarnCluster");
            this.yarnCluster.shutdown(false);
        }
    }
}

