package org.apache.flink.runtime.security.token.hadoop;

import java.io.IOException;
import java.time.Clock;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.security.token.DelegationTokenProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.class */
public class HadoopFSDelegationTokenProvider implements DelegationTokenProvider {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
    private Configuration flinkConfiguration;
    private org.apache.hadoop.conf.Configuration hadoopConfiguration;
    private KerberosLoginProvider kerberosLoginProvider;
    private Optional<Long> tokenRenewalInterval;

    @Override // org.apache.flink.core.security.token.DelegationTokenProvider
    public String serviceName() {
        return "hadoopfs";
    }

    @Override // org.apache.flink.core.security.token.DelegationTokenProvider
    public void init(Configuration configuration) throws Exception {
        this.flinkConfiguration = configuration;
        try {
            this.hadoopConfiguration = HadoopUtils.getHadoopConfiguration(configuration);
            this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
        } catch (NoClassDefFoundError e) {
            LOG.info("Hadoop FS is not available (not packaged with this application): {} : \"{}\".", e.getClass().getSimpleName(), e.getMessage());
        }
    }

    @Override // org.apache.flink.core.security.token.DelegationTokenProvider
    public boolean delegationTokensRequired() throws Exception {
        if (this.hadoopConfiguration != null) {
            return HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser()) && this.kerberosLoginProvider.isLoginPossible(false);
        }
        LOG.debug("Hadoop FS is not available (not packaged with this application), hence no tokens will be acquired.");
        return false;
    }

    @Override // org.apache.flink.core.security.token.DelegationTokenProvider
    public DelegationTokenProvider.ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
        return (DelegationTokenProvider.ObtainedDelegationTokens) this.kerberosLoginProvider.doLoginAndReturnUGI().doAs(() -> {
            Credentials credentials = new Credentials();
            Clock systemDefaultZone = Clock.systemDefaultZone();
            Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess();
            obtainDelegationTokens(getRenewer(), fileSystemsToAccess, credentials);
            if (this.tokenRenewalInterval == null) {
                this.tokenRenewalInterval = getTokenRenewalInterval(systemDefaultZone, fileSystemsToAccess);
            }
            return new DelegationTokenProvider.ObtainedDelegationTokens(HadoopDelegationTokenConverter.serialize(credentials), this.tokenRenewalInterval.flatMap(l -> {
                return getTokenRenewalDate(systemDefaultZone, credentials, l.longValue());
            }));
        });
    }

    @VisibleForTesting
    @Nullable
    String getRenewer() {
        return this.flinkConfiguration.getString(String.format("security.kerberos.token.provider.%s.renewer", serviceName()), (String) null);
    }

    private Set<FileSystem> getFileSystemsToAccess() throws IOException {
        HashSet hashSet = new HashSet();
        FileSystem fileSystem = FileSystem.get(this.hadoopConfiguration);
        LOG.debug("Adding Hadoop default filesystem to file systems to access {}", fileSystem);
        hashSet.add(fileSystem);
        LOG.debug("Hadoop default filesystem added to file systems to access successfully");
        ConfigUtils.decodeListFromConfig(this.flinkConfiguration, SecurityOptions.KERBEROS_HADOOP_FILESYSTEMS_TO_ACCESS, Path::new).forEach(path -> {
            try {
                LOG.debug("Adding path's filesystem to file systems to access {}", path);
                hashSet.add(path.getFileSystem(this.hadoopConfiguration));
                LOG.debug("Path's filesystem added to file systems to access successfully");
            } catch (IOException e) {
                LOG.error("Failed to get filesystem for {}", path, e);
                throw new FlinkRuntimeException(e);
            }
        });
        if (this.flinkConfiguration.getString(DeploymentOptions.TARGET).toLowerCase().contains("yarn")) {
            LOG.debug("Running on YARN, trying to add staging directory to file systems to access");
            String string = this.flinkConfiguration.getString("yarn.staging-directory", "");
            if (StringUtils.isBlank(string)) {
                LOG.debug("Staging directory is not set or empty so not added to file systems to access");
            } else {
                LOG.debug("Adding staging directory to file systems to access {}", string);
                hashSet.add(new Path(string).getFileSystem(this.hadoopConfiguration));
                LOG.debug("Staging directory added to file systems to access successfully");
            }
        }
        return hashSet;
    }

    protected void obtainDelegationTokens(@Nullable String str, Set<FileSystem> set, Credentials credentials) {
        set.forEach(fileSystem -> {
            try {
                LOG.debug("Obtaining delegation token for {} with renewer {}", fileSystem, str);
                fileSystem.addDelegationTokens(str, credentials);
                LOG.debug("Delegation obtained successfully");
            } catch (Exception e) {
                LOG.error("Failed to obtain delegation token for {}", fileSystem, e);
                throw new FlinkRuntimeException(e);
            }
        });
    }

    @VisibleForTesting
    Optional<Long> getTokenRenewalInterval(Clock clock, Set<FileSystem> set) throws IOException {
        String userName = UserGroupInformation.getCurrentUser().getUserName();
        Credentials credentials = new Credentials();
        obtainDelegationTokens(userName, set, credentials);
        Optional<Long> min = credentials.getAllTokens().stream().filter(token -> {
            try {
                return token.decodeIdentifier() instanceof AbstractDelegationTokenIdentifier;
            } catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        }).map(token2 -> {
            try {
                long renew = token2.renew(this.hadoopConfiguration);
                AbstractDelegationTokenIdentifier abstractDelegationTokenIdentifier = (AbstractDelegationTokenIdentifier) token2.decodeIdentifier();
                String text = token2.getKind().toString();
                long issueDate = renew - getIssueDate(clock, text, abstractDelegationTokenIdentifier);
                LOG.debug("Renewal interval is {} for token {}", Long.valueOf(issueDate), text);
                return Long.valueOf(issueDate);
            } catch (Exception e) {
                throw new FlinkRuntimeException(e);
            }
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        });
        LOG.debug("Global renewal interval is {}", min);
        return min;
    }

    @VisibleForTesting
    Optional<Long> getTokenRenewalDate(Clock clock, Credentials credentials, long j) {
        if (j < 0) {
            LOG.debug("Negative renewal interval so no renewal date is calculated");
            return Optional.empty();
        }
        Optional<Long> min = credentials.getAllTokens().stream().filter(token -> {
            try {
                return token.decodeIdentifier() instanceof AbstractDelegationTokenIdentifier;
            } catch (IOException e) {
                throw new FlinkRuntimeException(e);
            }
        }).map(token2 -> {
            try {
                AbstractDelegationTokenIdentifier abstractDelegationTokenIdentifier = (AbstractDelegationTokenIdentifier) token2.decodeIdentifier();
                String text = token2.getKind().toString();
                long issueDate = getIssueDate(clock, text, abstractDelegationTokenIdentifier) + j;
                LOG.debug("Renewal date is {} for token {}", Long.valueOf(issueDate), text);
                return Long.valueOf(issueDate);
            } catch (Exception e) {
                throw new FlinkRuntimeException(e);
            }
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        });
        LOG.debug("Global renewal date is {}", min);
        return min;
    }

    @VisibleForTesting
    long getIssueDate(Clock clock, String str, AbstractDelegationTokenIdentifier abstractDelegationTokenIdentifier) {
        long millis = clock.millis();
        long issueDate = abstractDelegationTokenIdentifier.getIssueDate();
        if (issueDate > millis) {
            LOG.warn("Token {} has set up issue date later than current time. (provided: {} / current timestamp: {}) Please make sure clocks are in sync between machines. If the issue is not a clock mismatch, consult token implementor to check whether issue date is valid.", new Object[]{str, Long.valueOf(issueDate), Long.valueOf(millis)});
            return issueDate;
        }
        if (issueDate > 0) {
            return issueDate;
        }
        LOG.warn("Token {} has not set up issue date properly. (provided: {}) Using current timestamp ({}) as issue date instead. Consult token implementor to fix the behavior.", new Object[]{str, Long.valueOf(issueDate), Long.valueOf(millis)});
        return millis;
    }
}
