package org.apache.atlas.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.atlas.ApplicationProperties;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/utils/KafkaUtils.class */
public class KafkaUtils implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
    private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
    private static final String JAAS_PRINCIPAL_PROP = "principal";
    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
    private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import";
    public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
    public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
    protected final Properties kafkaConfiguration;
    protected final AdminClient adminClient;
    protected final boolean importInternalTopics;

    public KafkaUtils(Configuration configuration) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils() ");
        }
        this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(configuration, ATLAS_KAFKA_PROPERTY_PREFIX);
        setKafkaJAASProperties(configuration, this.kafkaConfiguration);
        this.adminClient = AdminClient.create(this.kafkaConfiguration);
        this.importInternalTopics = configuration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils() ");
        }
    }

    public void createTopics(List<String> list, int i, int i2) throws TopicExistsException, ExecutionException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> createTopics() ");
        }
        Iterator it = this.adminClient.createTopics((List) list.stream().map(str -> {
            return new NewTopic(str, i, (short) i2);
        }).collect(Collectors.toList())).values().entrySet().iterator();
        while (it.hasNext()) {
            ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== createTopics() ");
        }
    }

    public List<String> listAllTopics() throws ExecutionException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils.listAllTopics() ");
        }
        ArrayList arrayList = new ArrayList((Collection) this.adminClient.listTopics(new ListTopicsOptions().listInternal(this.importInternalTopics)).names().get());
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils.listAllTopics() ");
        }
        return arrayList;
    }

    public Integer getPartitionCount(String str) throws ExecutionException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils.getPartitionCount({})", str);
        }
        Integer num = null;
        List<TopicPartitionInfo> partitionList = getPartitionList(str);
        if (partitionList != null) {
            num = Integer.valueOf(partitionList.size());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", str, num);
        }
        return num;
    }

    public Integer getReplicationFactor(String str) throws ExecutionException, InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils.getReplicationFactor({})", str);
        }
        Integer num = null;
        List<TopicPartitionInfo> partitionList = getPartitionList(str);
        if (partitionList != null) {
            num = Integer.valueOf(partitionList.stream().mapToInt(topicPartitionInfo -> {
                return topicPartitionInfo.replicas().size();
            }).max().getAsInt());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils.getReplicationFactor returning for topic {} with replicationFactor {}", str, num);
        }
        return num;
    }

    private List<TopicPartitionInfo> getPartitionList(String str) throws ExecutionException, InterruptedException {
        List<TopicPartitionInfo> list = null;
        DescribeTopicsResult describeTopics = this.adminClient.describeTopics(Collections.singleton(str));
        if (describeTopics != null) {
            Iterator it = describeTopics.values().entrySet().iterator();
            while (it.hasNext()) {
                list = ((TopicDescription) ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get()).partitions();
            }
        }
        return list;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils.close()");
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils.close()");
        }
    }

    public static void setKafkaJAASProperties(Configuration configuration, Properties properties) {
        String property;
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
        }
        if (properties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
            LOG.debug("JAAS config is already set, returning");
            return;
        }
        Properties subsetAsProperties = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
        if (subsetAsProperties != null && !subsetAsProperties.isEmpty()) {
            String str = JAAS_DEFAULT_CLIENT_NAME;
            if (!isLoginKeytabBased() && isLoginTicketBased()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Checking if ticketBased-KafkaClient is set");
                }
                Configuration subset = configuration.subset("atlas.jaas.ticketBased-KafkaClient");
                if (subset == null || subset.isEmpty()) {
                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, str);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
                    }
                    str = JAAS_TICKET_BASED_CLIENT_NAME;
                }
            }
            String str2 = str + ".";
            String str3 = str2 + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
            String property2 = subsetAsProperties.getProperty(str3);
            if (property2 == null) {
                LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", new Object[]{str, str3, str});
                return;
            }
            String str4 = str2 + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
            String property3 = subsetAsProperties.getProperty(str4);
            if (StringUtils.isEmpty(property3)) {
                property3 = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
                LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", new Object[]{str4, property3, JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS});
            }
            String str5 = str2 + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
            String str6 = str5 + JAAS_PRINCIPAL_PROP;
            int length = str5.length();
            StringBuffer stringBuffer = new StringBuffer();
            for (String str7 : subsetAsProperties.stringPropertyNames()) {
                if (str7.startsWith(str5) && (property = subsetAsProperties.getProperty(str7)) != null) {
                    String trim = property.trim();
                    try {
                        if (str7.equalsIgnoreCase(str6)) {
                            trim = SecurityUtil.getServerPrincipal(trim, (String) null);
                        }
                    } catch (IOException e) {
                        LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", trim);
                    }
                    stringBuffer.append(String.format(" %s=%s", str7.substring(length), surroundWithQuotes(trim)));
                }
            }
            properties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, String.format("%s %s %s ;", property2.trim(), property3, stringBuffer.toString()));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
        }
    }

    static boolean isLoginKeytabBased() {
        boolean z = false;
        try {
            z = UserGroupInformation.isLoginKeytabBased();
        } catch (Exception e) {
            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", e);
        }
        return z;
    }

    public static boolean isLoginTicketBased() {
        boolean z = false;
        try {
            z = UserGroupInformation.isLoginTicketBased();
        } catch (Exception e) {
            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", e);
        }
        return z;
    }

    static String surroundWithQuotes(String str) {
        return StringUtils.isEmpty(str) ? str : String.format("\"%s\"", str.replace("\"", "\\\""));
    }
}
