package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.DTLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/LaunchContainerRunnable.class */
public class LaunchContainerRunnable implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LaunchContainerRunnable.class);
    private static final String JAVA_REMOTE_DEBUG_OPTS = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n";
    private final Map<String, String> containerEnv = new HashMap();
    private final LogicalPlan dag;
    private final ByteBuffer tokens;
    private final Container container;
    private final NMClientAsync nmClient;
    private final StreamingContainerAgent sca;
    private static final int MB_TO_B = 1048576;

    public LaunchContainerRunnable(Container container, NMClientAsync nMClientAsync, StreamingContainerAgent streamingContainerAgent, ByteBuffer byteBuffer) {
        this.container = container;
        this.nmClient = nMClientAsync;
        this.dag = streamingContainerAgent.getContainer().getPlan().getLogicalPlan();
        this.tokens = byteBuffer;
        this.sca = streamingContainerAgent;
    }

    private void setClasspath(Map<String, String> map) {
        StringBuilder sb = new StringBuilder("./*");
        String str = this.nmClient.getConfig().get("yarn.application.classpath");
        for (String str2 : StringUtils.isBlank(str) ? YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH : str.split(",")) {
            if (!str2.equals("$HADOOP_CLIENT_CONF_DIR")) {
                sb.append(':');
                sb.append(str2.trim());
            }
        }
        sb.append(":.");
        map.put("CLASSPATH", sb.toString());
        LOG.info("CLASSPATH: {}", sb);
    }

    public static void addFileToLocalResources(String str, FileStatus fileStatus, LocalResourceType localResourceType, Map<String, LocalResource> map) {
        map.put(str, LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()), localResourceType, LocalResourceVisibility.APPLICATION, fileStatus.getLen(), fileStatus.getModificationTime()));
    }

    public static void addFilesToLocalResources(LocalResourceType localResourceType, String str, Map<String, LocalResource> map, FileSystem fileSystem) throws IOException {
        for (String str2 : StringUtils.splitByWholeSeparator(str, ",")) {
            Path path = new Path(str2);
            addFileToLocalResources(path.getName(), fileSystem.getFileStatus(path), localResourceType, map);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Setting up container launch context for containerid={}", this.container.getId());
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        setClasspath(this.containerEnv);
        try {
            this.containerEnv.put("HADOOP_USER_NAME", UserGroupInformation.getLoginUser().getUserName());
        } catch (Exception e) {
            LOG.error("Failed to retrieve principal name", e);
        }
        containerLaunchContext.setEnvironment(this.containerEnv);
        containerLaunchContext.setTokens(this.tokens);
        HashMap hashMap = new HashMap();
        try {
            FileSystem newFileSystemInstance = StramClientUtils.newFileSystemInstance(this.nmClient.getConfig());
            Throwable th = null;
            try {
                try {
                    addFilesToLocalResources(LocalResourceType.FILE, (String) this.dag.getAttributes().get(Context.DAGContext.LIBRARY_JARS), hashMap, newFileSystemInstance);
                    String str = (String) this.dag.getAttributes().get(LogicalPlan.ARCHIVES);
                    if (str != null) {
                        addFilesToLocalResources(LocalResourceType.ARCHIVE, str, hashMap, newFileSystemInstance);
                    }
                    containerLaunchContext.setLocalResources(hashMap);
                    if (newFileSystemInstance != null) {
                        if (0 != 0) {
                            try {
                                newFileSystemInstance.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newFileSystemInstance.close();
                        }
                    }
                    List<CharSequence> childVMCommand = getChildVMCommand(this.container.getId().toString());
                    StringBuilder sb = new StringBuilder(1024);
                    Iterator<CharSequence> it = childVMCommand.iterator();
                    while (it.hasNext()) {
                        sb.append(it.next()).append(" ");
                    }
                    LOG.info("Launching on node: {} command: {}", this.container.getNodeId(), sb);
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(sb.toString());
                    containerLaunchContext.setCommands(arrayList);
                    this.nmClient.startContainerAsync(this.container, containerLaunchContext);
                } finally {
                }
            } finally {
            }
        } catch (IOException e2) {
            LOG.error("Failed to prepare local resources.", e2);
        }
    }

    public List<CharSequence> getChildVMCommand(String str) {
        ArrayList arrayList = new ArrayList(8);
        if (StringUtils.isBlank(System.getenv(ApplicationConstants.Environment.JAVA_HOME.key()))) {
            arrayList.add("java");
        } else {
            arrayList.add(ApplicationConstants.Environment.JAVA_HOME.$() + "/bin/java");
        }
        String str2 = (String) this.dag.getAttributes().get(LogicalPlan.CONTAINER_JVM_OPTIONS);
        if (str2 != null) {
            HashMap hashMap = new HashMap();
            hashMap.put("applicationId", Integer.toString(this.container.getId().getApplicationAttemptId().getApplicationId().getId()));
            hashMap.put("containerId", Integer.toString(this.container.getId().getId()));
            arrayList.add(new StrSubstitutor(hashMap, "%(", ")").replace(str2));
            if (this.dag.isDebug() && !str2.contains("-agentlib:jdwp=")) {
                arrayList.add(JAVA_REMOTE_DEBUG_OPTS);
            }
        } else if (this.dag.isDebug()) {
            arrayList.add(JAVA_REMOTE_DEBUG_OPTS);
        }
        ArrayList newArrayList = Lists.newArrayList();
        int i = 0;
        for (PTOperator pTOperator : this.sca.getContainer().getOperators()) {
            i += pTOperator.getBufferServerMemory();
            newArrayList.add(pTOperator.getOperatorMeta());
        }
        String parseJvmOpts = parseJvmOpts(((Context.ContainerOptConfigurator) this.dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR)).getJVMOptions(newArrayList), i * 1048576);
        LOG.info("Jvm opts {} for container {}", parseJvmOpts, this.container.getId());
        arrayList.add(parseJvmOpts);
        Path path = new Path(ApplicationConstants.Environment.PWD.$(), "./tmp");
        arrayList.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, this.dag.assertAppPath()));
        arrayList.add("-Djava.io.tmpdir=" + path);
        arrayList.add(String.format("-D%scid=%s", "dt.", str));
        arrayList.add("-Dhadoop.root.logger=" + (this.dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
        arrayList.add("-Dhadoop.log.dir=<LOG_DIR>");
        String property = System.getProperty(DTLoggerFactory.DT_LOGGERS_LEVEL);
        if (property != null) {
            arrayList.add(String.format("-D%s=%s", DTLoggerFactory.DT_LOGGERS_LEVEL, property));
        }
        arrayList.add(StreamingContainer.class.getName());
        arrayList.add("1><LOG_DIR>/stdout");
        arrayList.add("2><LOG_DIR>/stderr");
        StringBuilder sb = new StringBuilder(256);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            sb.append((CharSequence) it.next()).append(" ");
        }
        ArrayList arrayList2 = new ArrayList(1);
        arrayList2.add(sb.toString());
        return arrayList2;
    }

    private String parseJvmOpts(String str, long j) {
        StringBuilder sb = new StringBuilder();
        if (str != null && str.length() > 1) {
            boolean z = false;
            for (String str2 : str.split("(\\s+)")) {
                if (str2.startsWith("-Xmx")) {
                    z = true;
                    sb.append("-Xmx").append(Long.valueOf(str2.substring("-Xmx".length())).longValue() + j).append(" ");
                } else {
                    sb.append(str2).append(" ");
                }
            }
            if (!z) {
                sb.append("-Xmx").append(j);
            }
        }
        return sb.toString();
    }

    public static ByteBuffer getTokens(StramDelegationTokenManager stramDelegationTokenManager, InetSocketAddress inetSocketAddress) throws IOException {
        if (!UserGroupInformation.isSecurityEnabled()) {
            return null;
        }
        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
        StramDelegationTokenIdentifier stramDelegationTokenIdentifier = new StramDelegationTokenIdentifier(new Text(loginUser.getUserName()), new Text(""), new Text(""));
        String str = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
        Token token = new Token(stramDelegationTokenIdentifier, stramDelegationTokenManager);
        token.setService(new Text(str));
        return getTokens(loginUser, (Token<StramDelegationTokenIdentifier>) token);
    }

    public static ByteBuffer getTokens(UserGroupInformation userGroupInformation, Token<StramDelegationTokenIdentifier> token) {
        try {
            Collection<Token> allTokens = userGroupInformation.getCredentials().getAllTokens();
            Credentials credentials = new Credentials();
            for (Token token2 : allTokens) {
                if (!token2.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                    credentials.addToken(token2.getService(), token2);
                    LOG.debug("Passing container token {}", token2);
                }
            }
            credentials.addToken(token.getService(), token);
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            credentials.writeTokenStorageToStream(dataOutputBuffer);
            return ByteBuffer.wrap(dataOutputBuffer.getData()).duplicate();
        } catch (IOException e) {
            throw new RuntimeException("Error generating delegation token", e);
        }
    }
}
