package org.apache.flink.yarn;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.security.token.DelegationTokenConverter;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/Utils.class */
public final class Utils {
    public static final String KRB5_FILE_NAME = "krb5.conf";
    public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";

    @VisibleForTesting
    static final String YARN_RM_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler";

    @VisibleForTesting
    static final String YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler";

    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_KEY = "yarn.resource-types.memory-mb.increment-allocation";

    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY = "yarn.scheduler.increment-allocation-mb";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB = 1024;

    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY = "yarn.resource-types.vcores.increment-allocation";

    @VisibleForTesting
    static final String YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY = "yarn.scheduler.increment-allocation-vcores";
    private static final int DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES = 1;
    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
    private static final String[] FLINK_CONFIG_PREFIXES = {"flink.yarn."};

    public static void setupYarnClassPath(Configuration configuration, Map<String, String> map) {
        addToEnvironment(map, ApplicationConstants.Environment.CLASSPATH.name(), map.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
        String[] strings = configuration.getStrings("yarn.application.classpath", YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
        int length = strings.length;
        for (int i = 0; i < length; i += DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES) {
            addToEnvironment(map, ApplicationConstants.Environment.CLASSPATH.name(), strings[i].trim());
        }
    }

    public static void deleteApplicationFiles(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
            return;
        }
        Path path = new Path(str);
        try {
            if (!path.getFileSystem().delete(path, true)) {
                LOG.error("Deleting yarn application files under {} was unsuccessful.", str);
            }
        } catch (IOException e) {
            LOG.error("Could not properly delete yarn application files directory {}.", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static LocalResource registerLocalResource(org.apache.hadoop.fs.Path path, long j, long j2, LocalResourceVisibility localResourceVisibility, LocalResourceType localResourceType) {
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        localResource.setResource(ConverterUtils.getYarnUrlFromURI(path.toUri()));
        localResource.setSize(j);
        localResource.setTimestamp(j2);
        localResource.setType(localResourceType);
        localResource.setVisibility(localResourceVisibility);
        return localResource;
    }

    private static LocalResource registerLocalResource(FileSystem fileSystem, org.apache.hadoop.fs.Path path, LocalResourceType localResourceType) throws IOException {
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        return registerLocalResource(path, fileStatus.getLen(), fileStatus.getModificationTime(), LocalResourceVisibility.APPLICATION, localResourceType);
    }

    public static void setTokensFor(ContainerLaunchContext containerLaunchContext, List<org.apache.hadoop.fs.Path> list, Configuration configuration, boolean z) throws IOException {
        Credentials credentials = new Credentials();
        if (z) {
            LOG.info("Obtaining delegation tokens for HDFS and HBase.");
            TokenCache.obtainTokensForNamenodes(credentials, (org.apache.hadoop.fs.Path[]) list.toArray(new org.apache.hadoop.fs.Path[0]), configuration);
            obtainTokenForHBase(credentials, configuration);
        } else {
            LOG.info("Delegation token retrieval for HDFS and HBase is disabled.");
        }
        for (Token token : UserGroupInformation.getCurrentUser().getCredentials().getAllTokens()) {
            LOG.info("Adding user token " + token.getService() + " with " + token);
            credentials.addToken(token.getService(), token);
        }
        containerLaunchContext.setTokens(ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials)));
    }

    private static void obtainTokenForHBase(Credentials credentials, Configuration configuration) throws IOException {
        Token token;
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.info("Attempting to obtain Kerberos security token for HBase");
            try {
                Class.forName("org.apache.hadoop.hbase.HBaseConfiguration").getMethod("addHbaseResources", Configuration.class).invoke(null, configuration);
                LOG.info("HBase security setting: {}", configuration.get("hbase.security.authentication"));
                if (!"kerberos".equals(configuration.get("hbase.security.authentication"))) {
                    LOG.info("HBase has not been configured to use Kerberos.");
                    return;
                }
                try {
                    LOG.info("Obtaining Kerberos security token for HBase");
                    token = (Token) Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", Configuration.class).invoke(null, configuration);
                } catch (NoSuchMethodException e) {
                    Closeable closeable = (Closeable) Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory").getMethod("createConnection", Configuration.class).invoke(null, configuration);
                    token = (Token) Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil").getMethod("obtainToken", Class.forName("org.apache.hadoop.hbase.client.Connection")).invoke(null, closeable);
                    if (null != closeable) {
                        closeable.close();
                    }
                }
                if (token == null) {
                    LOG.error("No Kerberos security token for HBase available");
                } else {
                    credentials.addToken(token.getService(), token);
                    LOG.info("Added HBase Kerberos security token to credentials.");
                }
            } catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e2) {
                LOG.info("HBase is not available (not packaged with this application): {} : \"{}\".", e2.getClass().getSimpleName(), e2.getMessage());
            }
        }
    }

    public static void addToEnvironment(Map<String, String> map, String str, String str2) {
        String str3 = map.get(str);
        map.put(StringInterner.weakIntern(str), StringInterner.weakIntern(str3 == null ? str2 : str3 + File.pathSeparator + str2));
    }

    public static String resolveKeytabPath(String str, String str2) {
        String str3 = null;
        if (str2 != null) {
            File file = new File(str2);
            if (file.exists()) {
                str3 = file.getAbsolutePath();
                LOG.info("Resolved keytab path: {}", str3);
            } else {
                File file2 = new File(str, str2);
                if (file2.exists()) {
                    str3 = file2.getAbsolutePath();
                    LOG.info("Resolved keytab path: {}", str3);
                } else {
                    LOG.warn("Could not resolve keytab path with: {}", str2);
                    str3 = null;
                }
            }
        }
        return str3;
    }

    private Utils() {
        throw new RuntimeException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ContainerLaunchContext createTaskExecutorContext(org.apache.flink.configuration.Configuration configuration, YarnConfiguration yarnConfiguration, YarnResourceManagerDriverConfiguration yarnResourceManagerDriverConfiguration, ContaineredTaskManagerParameters containeredTaskManagerParameters, String str, String str2, Class<?> cls, Logger logger) throws Exception {
        String str3 = (String) Preconditions.checkNotNull(yarnResourceManagerDriverConfiguration.getFlinkDistJar(), "Environment variable %s not set", new Object[]{YarnConfigKeys.FLINK_DIST_JAR});
        String str4 = (String) Preconditions.checkNotNull(yarnResourceManagerDriverConfiguration.getClientShipFiles(), "Environment variable %s not set", new Object[]{YarnConfigKeys.ENV_CLIENT_SHIP_FILES});
        String remoteKeytabPath = yarnResourceManagerDriverConfiguration.getRemoteKeytabPath();
        String localKeytabPath = yarnResourceManagerDriverConfiguration.getLocalKeytabPath();
        String keytabPrinciple = yarnResourceManagerDriverConfiguration.getKeytabPrinciple();
        String yarnSiteXMLPath = yarnResourceManagerDriverConfiguration.getYarnSiteXMLPath();
        String krb5Path = yarnResourceManagerDriverConfiguration.getKrb5Path();
        if (logger.isDebugEnabled()) {
            logger.debug("TM:remote keytab path obtained {}", remoteKeytabPath);
            logger.debug("TM:local keytab path obtained {}", localKeytabPath);
            logger.debug("TM:keytab principal obtained {}", keytabPrinciple);
            logger.debug("TM:remote yarn conf path obtained {}", yarnSiteXMLPath);
            logger.debug("TM:remote krb5 path obtained {}", krb5Path);
        }
        String str5 = (String) Preconditions.checkNotNull(yarnResourceManagerDriverConfiguration.getFlinkClasspath(), "Environment variable %s not set", new Object[]{YarnConfigKeys.ENV_FLINK_CLASSPATH});
        LocalResource localResource = null;
        if (remoteKeytabPath != null) {
            logger.info("TM:Adding keytab {} to the container local resource bucket", remoteKeytabPath);
            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(remoteKeytabPath);
            localResource = registerLocalResource(path.getFileSystem(yarnConfiguration), path, LocalResourceType.FILE);
        }
        LocalResource localResource2 = null;
        if (yarnSiteXMLPath != null) {
            logger.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", yarnSiteXMLPath);
            org.apache.hadoop.fs.Path path2 = new org.apache.hadoop.fs.Path(yarnSiteXMLPath);
            localResource2 = registerLocalResource(path2.getFileSystem(yarnConfiguration), path2, LocalResourceType.FILE);
        }
        LocalResource localResource3 = null;
        boolean z = false;
        if (krb5Path != null) {
            logger.info("Adding remoteKrb5Path {} to the container local resource bucket", krb5Path);
            org.apache.hadoop.fs.Path path3 = new org.apache.hadoop.fs.Path(krb5Path);
            localResource3 = registerLocalResource(path3.getFileSystem(yarnConfiguration), path3, LocalResourceType.FILE);
            z = DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES;
        }
        HashMap hashMap = new HashMap();
        YarnLocalResourceDescriptor fromString = YarnLocalResourceDescriptor.fromString(str3);
        hashMap.put(fromString.getResourceKey(), fromString.toLocalResource());
        if (localResource2 != null) {
            hashMap.put(YARN_SITE_FILE_NAME, localResource2);
        }
        if (localResource3 != null) {
            hashMap.put(KRB5_FILE_NAME, localResource3);
        }
        if (localResource != null) {
            hashMap.put(localKeytabPath, localResource);
        }
        decodeYarnLocalResourceDescriptorListFromString(str4).forEach(yarnLocalResourceDescriptor -> {
        });
        logger.info("Creating container launch context for TaskManagers");
        String taskManagerShellCommand = BootstrapTools.getTaskManagerShellCommand(configuration, containeredTaskManagerParameters, ".", "<LOG_DIR>", new File(str2, YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME).exists(), new File(str2, YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME).exists(), z, cls, str);
        if (logger.isDebugEnabled()) {
            logger.debug("Starting TaskManagers with command: " + taskManagerShellCommand);
        } else {
            logger.info("Starting TaskManagers");
        }
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        containerLaunchContext.setCommands(Collections.singletonList(taskManagerShellCommand));
        containerLaunchContext.setLocalResources(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(containeredTaskManagerParameters.taskManagerEnv());
        hashMap2.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, str5);
        setupYarnClassPath(yarnConfiguration, hashMap2);
        hashMap2.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
        if (remoteKeytabPath != null && localKeytabPath != null && keytabPrinciple != null) {
            hashMap2.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remoteKeytabPath);
            hashMap2.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localKeytabPath);
            hashMap2.put(YarnConfigKeys.KEYTAB_PRINCIPAL, keytabPrinciple);
        } else if (localKeytabPath != null && keytabPrinciple != null) {
            hashMap2.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localKeytabPath);
            hashMap2.put(YarnConfigKeys.KEYTAB_PRINCIPAL, keytabPrinciple);
        }
        containerLaunchContext.setEnvironment(hashMap2);
        String str6 = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
        if (str6 != null) {
            logger.debug("Adding security tokens to TaskExecutor's container launch context.");
            try {
                DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                Throwable th = null;
                try {
                    Credentials readTokenStorageFile = Credentials.readTokenStorageFile(new File(str6), HadoopUtils.getHadoopConfiguration(configuration));
                    Credentials credentials = new Credentials();
                    for (Token token : readTokenStorageFile.getAllTokens()) {
                        if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
                            credentials.addToken(token.getService(), token);
                        }
                    }
                    credentials.writeTokenStorageToStream(dataOutputBuffer);
                    containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                    if (dataOutputBuffer != null) {
                        if (0 != 0) {
                            try {
                                dataOutputBuffer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataOutputBuffer.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                logger.error("Failed to add Hadoop's security tokens.", th3);
            }
        } else {
            logger.info("Could not set security tokens because Hadoop's token file location is unknown.");
        }
        return containerLaunchContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isRemotePath(String str) throws IOException {
        return new Path(str).getFileSystem().isDistributedFS();
    }

    private static List<YarnLocalResourceDescriptor> decodeYarnLocalResourceDescriptorListFromString(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        String[] split = str.split(YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR);
        int length = split.length;
        for (int i = 0; i < length; i += DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES) {
            String str2 = split[i];
            if (!str2.isEmpty()) {
                arrayList.add(YarnLocalResourceDescriptor.fromString(str2));
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    static Resource getUnitResource(YarnConfiguration yarnConfiguration) {
        int parseInt;
        int parseInt2;
        String str = yarnConfiguration.get("yarn.resourcemanager.scheduler.class");
        if (Objects.equals(str, YARN_RM_FAIR_SCHEDULER_CLAZZ) || Objects.equals(str, YARN_RM_SLS_FAIR_SCHEDULER_CLAZZ)) {
            String str2 = yarnConfiguration.get(YARN_RM_INCREMENT_ALLOCATION_MB_KEY);
            String str3 = yarnConfiguration.get(YARN_RM_INCREMENT_ALLOCATION_VCORES_KEY);
            parseInt = str2 != null ? Integer.parseInt(str2) : yarnConfiguration.getInt(YARN_RM_INCREMENT_ALLOCATION_MB_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB);
            parseInt2 = str3 != null ? Integer.parseInt(str3) : yarnConfiguration.getInt(YARN_RM_INCREMENT_ALLOCATION_VCORES_LEGACY_KEY, DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES);
        } else {
            parseInt = yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", DEFAULT_YARN_RM_INCREMENT_ALLOCATION_MB);
            parseInt2 = yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-vcores", DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES);
        }
        return Resource.newInstance(parseInt, parseInt2);
    }

    public static List<org.apache.hadoop.fs.Path> getQualifiedRemoteProvidedLibDirs(org.apache.flink.configuration.Configuration configuration, YarnConfiguration yarnConfiguration) throws IOException {
        return getRemoteSharedLibPaths(configuration, str -> {
            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(str);
            return path.getFileSystem(yarnConfiguration).makeQualified(path);
        });
    }

    private static List<org.apache.hadoop.fs.Path> getRemoteSharedLibPaths(org.apache.flink.configuration.Configuration configuration, FunctionWithException<String, org.apache.hadoop.fs.Path, IOException> functionWithException) throws IOException {
        List<org.apache.hadoop.fs.Path> decodeListFromConfig = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.PROVIDED_LIB_DIRS, functionWithException);
        for (org.apache.hadoop.fs.Path path : decodeListFromConfig) {
            if (!isRemotePath(path.toString())) {
                throw new IllegalArgumentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\" should only contain dirs accessible from all worker nodes, while the \"" + path + "\" is local.");
            }
        }
        return decodeListFromConfig;
    }

    public static boolean isUsrLibDirectory(FileSystem fileSystem, org.apache.hadoop.fs.Path path) throws IOException {
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        return fileStatus.isDirectory() && "usrlib".equals(fileStatus.getPath().getName());
    }

    public static Optional<org.apache.hadoop.fs.Path> getQualifiedRemoteProvidedUsrLib(org.apache.flink.configuration.Configuration configuration, YarnConfiguration yarnConfiguration) throws IOException, IllegalArgumentException {
        String string = configuration.getString(YarnConfigOptions.PROVIDED_USRLIB_DIR);
        if (string == null) {
            return Optional.empty();
        }
        org.apache.hadoop.fs.Path makeQualified = FileSystem.get(yarnConfiguration).makeQualified(new org.apache.hadoop.fs.Path(string));
        Preconditions.checkArgument(isRemotePath(makeQualified.toString()), "The \"%s\" must point to a remote dir which is accessible from all worker nodes.", new Object[]{YarnConfigOptions.PROVIDED_USRLIB_DIR.key()});
        Preconditions.checkArgument(isUsrLibDirectory(FileSystem.get(yarnConfiguration), makeQualified), "The \"%s\" should be named with \"%s\".", new Object[]{YarnConfigOptions.PROVIDED_USRLIB_DIR.key(), "usrlib"});
        return Optional.of(makeQualified);
    }

    public static YarnConfiguration getYarnAndHadoopConfiguration(org.apache.flink.configuration.Configuration configuration) {
        YarnConfiguration yarnConfiguration = getYarnConfiguration(configuration);
        yarnConfiguration.addResource(HadoopUtils.getHadoopConfiguration(configuration));
        return yarnConfiguration;
    }

    public static YarnConfiguration getYarnConfiguration(org.apache.flink.configuration.Configuration configuration) {
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        for (String str : configuration.keySet()) {
            String[] strArr = FLINK_CONFIG_PREFIXES;
            int length = strArr.length;
            for (int i = 0; i < length; i += DEFAULT_YARN_RM_INCREMENT_ALLOCATION_VCORES) {
                if (str.startsWith(strArr[i])) {
                    String substring = str.substring("flink.".length());
                    String string = configuration.getString(str, (String) null);
                    yarnConfiguration.set(substring, string);
                    LOG.debug("Adding Flink config entry for {} as {}={} to Yarn config", new Object[]{str, substring, string});
                }
            }
        }
        return yarnConfiguration;
    }
}
