/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
import com.google.common.collect.Lists;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LaunchContainerRunnable
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LaunchContainerRunnable.class);
    private static final String JAVA_REMOTE_DEBUG_OPTS = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n";
    private final Map<String, String> containerEnv = new HashMap<String, String>();
    private final LogicalPlan dag;
    private final ByteBuffer tokens;
    private final Container container;
    private final NMClientAsync nmClient;
    private final StreamingContainerAgent sca;
    private static final int MB_TO_B = 0x100000;

    public LaunchContainerRunnable(Container lcontainer, NMClientAsync nmClient, StreamingContainerAgent sca, ByteBuffer tokens) {
        this.container = lcontainer;
        this.nmClient = nmClient;
        this.dag = sca.getContainer().getPlan().getLogicalPlan();
        this.tokens = tokens;
        this.sca = sca;
    }

    private void setClasspath(Map<String, String> env) {
        StringBuilder classPathEnv = new StringBuilder("./*");
        String classpath = this.nmClient.getConfig().get("yarn.application.classpath");
        for (String c : StringUtils.isBlank((String)classpath) ? YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH : classpath.split(",")) {
            if (c.equals("$HADOOP_CLIENT_CONF_DIR")) continue;
            classPathEnv.append(':');
            classPathEnv.append(c.trim());
        }
        classPathEnv.append(":.");
        env.put("CLASSPATH", classPathEnv.toString());
        LOG.info("CLASSPATH: {}", (Object)classPathEnv);
    }

    public static void addFileToLocalResources(String name, FileStatus fileStatus, LocalResourceType type, Map<String, LocalResource> localResources) {
        LocalResource localResource = LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromPath((Path)fileStatus.getPath()), (LocalResourceType)type, (LocalResourceVisibility)LocalResourceVisibility.APPLICATION, (long)fileStatus.getLen(), (long)fileStatus.getModificationTime());
        localResources.put(name, localResource);
    }

    public static void addFilesToLocalResources(LocalResourceType type, String commaSeparatedFileNames, Map<String, LocalResource> localResources, FileSystem fs) throws IOException {
        String[] files;
        for (String file : files = StringUtils.splitByWholeSeparator((String)commaSeparatedFileNames, (String)",")) {
            Path dst = new Path(file);
            LaunchContainerRunnable.addFileToLocalResources(dst.getName(), fs.getFileStatus(dst), type, localResources);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Setting up container launch context for containerid={}", (Object)this.container.getId());
        ContainerLaunchContext ctx = (ContainerLaunchContext)Records.newRecord(ContainerLaunchContext.class);
        this.setClasspath(this.containerEnv);
        try {
            this.containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
        }
        catch (Exception e) {
            LOG.error("Failed to retrieve principal name", (Throwable)e);
        }
        ctx.setEnvironment(this.containerEnv);
        ctx.setTokens(this.tokens);
        HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>();
        try (FileSystem fs = StramClientUtils.newFileSystemInstance(this.nmClient.getConfig());){
            LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, (String)this.dag.getAttributes().get(LogicalPlan.LIBRARY_JARS), localResources, fs);
            String archives = (String)this.dag.getAttributes().get(LogicalPlan.ARCHIVES);
            if (archives != null) {
                LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.ARCHIVE, archives, localResources, fs);
            }
            ctx.setLocalResources(localResources);
        }
        catch (IOException e) {
            LOG.error("Failed to prepare local resources.", (Throwable)e);
            return;
        }
        List<CharSequence> vargs = this.getChildVMCommand(this.container.getId().toString());
        StringBuilder command = new StringBuilder(1024);
        for (CharSequence str : vargs) {
            command.append(str).append(" ");
        }
        LOG.info("Launching on node: {} command: {}", (Object)this.container.getNodeId(), (Object)command);
        ArrayList<String> commands = new ArrayList<String>();
        commands.add(command.toString());
        ctx.setCommands(commands);
        this.nmClient.startContainerAsync(this.container, ctx);
    }

    public List<CharSequence> getChildVMCommand(String jvmID) {
        ArrayList<String> vargs = new ArrayList<String>(8);
        if (!StringUtils.isBlank((String)System.getenv(ApplicationConstants.Environment.JAVA_HOME.key()))) {
            vargs.add(ApplicationConstants.Environment.JAVA_HOME.$() + "/bin/java");
        } else {
            vargs.add("java");
        }
        String jvmOpts = (String)this.dag.getAttributes().get(LogicalPlan.CONTAINER_JVM_OPTIONS);
        if (jvmOpts == null) {
            if (this.dag.isDebug()) {
                vargs.add(JAVA_REMOTE_DEBUG_OPTS);
            }
        } else {
            HashMap<String, String> params = new HashMap<String, String>();
            params.put("applicationId", Integer.toString(this.container.getId().getApplicationAttemptId().getApplicationId().getId()));
            params.put("containerId", Integer.toString(this.container.getId().getId()));
            StrSubstitutor sub = new StrSubstitutor(params, "%(", ")");
            vargs.add(sub.replace(jvmOpts));
            if (this.dag.isDebug() && !jvmOpts.contains("-agentlib:jdwp=")) {
                vargs.add(JAVA_REMOTE_DEBUG_OPTS);
            }
        }
        ArrayList operatorMetaList = Lists.newArrayList();
        int bufferServerMemory = 0;
        for (PTOperator operator : this.sca.getContainer().getOperators()) {
            bufferServerMemory += operator.getBufferServerMemory();
            operatorMetaList.add(operator.getOperatorMeta());
        }
        Context.ContainerOptConfigurator containerOptConfigurator = (Context.ContainerOptConfigurator)this.dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR);
        jvmOpts = containerOptConfigurator.getJVMOptions((List)operatorMetaList);
        jvmOpts = this.parseJvmOpts(jvmOpts, (long)bufferServerMemory * 0x100000L);
        LOG.info("Jvm opts {} for container {}", (Object)jvmOpts, (Object)this.container.getId());
        vargs.add(jvmOpts);
        Path childTmpDir = new Path(ApplicationConstants.Environment.PWD.$(), "./tmp");
        vargs.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, this.dag.assertAppPath()));
        vargs.add("-Djava.io.tmpdir=" + childTmpDir);
        vargs.add(String.format("-D%scid=%s", "dt.", jvmID));
        vargs.add("-Dhadoop.root.logger=" + (this.dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
        vargs.add("-Dhadoop.log.dir=<LOG_DIR>");
        String loggersLevel = System.getProperty("dt.loggers.level");
        if (loggersLevel != null) {
            vargs.add(String.format("-D%s=%s", "dt.loggers.level", loggersLevel));
        }
        vargs.add(StreamingContainer.class.getName());
        vargs.add("1><LOG_DIR>/stdout");
        vargs.add("2><LOG_DIR>/stderr");
        StringBuilder mergedCommand = new StringBuilder(256);
        for (CharSequence charSequence : vargs) {
            mergedCommand.append(charSequence).append(" ");
        }
        ArrayList<CharSequence> vargsFinal = new ArrayList<CharSequence>(1);
        vargsFinal.add(mergedCommand.toString());
        return vargsFinal;
    }

    private String parseJvmOpts(String jvmOpts, long memory) {
        String xmx = "-Xmx";
        StringBuilder builder = new StringBuilder();
        if (jvmOpts != null && jvmOpts.length() > 1) {
            String[] splits = jvmOpts.split("(\\s+)");
            boolean foundProperty = false;
            for (String split : splits) {
                if (split.startsWith(xmx)) {
                    foundProperty = true;
                    long heapSize = Long.valueOf(split.substring(xmx.length()));
                    builder.append(xmx).append(heapSize += memory).append(" ");
                    continue;
                }
                builder.append(split).append(" ");
            }
            if (!foundProperty) {
                builder.append(xmx).append(memory);
            }
        }
        return builder.toString();
    }

    public static ByteBuffer getTokens(StramDelegationTokenManager delegationTokenManager, InetSocketAddress heartbeatAddress) throws IOException {
        if (UserGroupInformation.isSecurityEnabled()) {
            UserGroupInformation ugi = UserGroupInformation.getLoginUser();
            StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(ugi.getUserName()), new Text(""), new Text(""));
            String service = heartbeatAddress.getAddress().getHostAddress() + ":" + heartbeatAddress.getPort();
            Token stramToken = new Token((TokenIdentifier)identifier, (SecretManager)delegationTokenManager);
            stramToken.setService(new Text(service));
            return LaunchContainerRunnable.getTokens(ugi, (Token<StramDelegationTokenIdentifier>)stramToken);
        }
        return null;
    }

    public static ByteBuffer getTokens(UserGroupInformation ugi, Token<StramDelegationTokenIdentifier> delegationToken) {
        try {
            Collection tokens = ugi.getCredentials().getAllTokens();
            Credentials credentials = new Credentials();
            for (Token token : tokens) {
                if (token.getKind().equals((Object)AMRMTokenIdentifier.KIND_NAME)) continue;
                credentials.addToken(token.getService(), token);
                LOG.debug("Passing container token {}", (Object)token);
            }
            credentials.addToken(delegationToken.getService(), delegationToken);
            DataOutputBuffer dataOutput = new DataOutputBuffer();
            credentials.writeTokenStorageToStream((DataOutputStream)dataOutput);
            byte[] tokenBytes = dataOutput.getData();
            ByteBuffer cTokenBuf = ByteBuffer.wrap(tokenBytes);
            return cTokenBuf.duplicate();
        }
        catch (IOException e) {
            throw new RuntimeException("Error generating delegation token", e);
        }
    }
}

