package com.datatorrent.contrib.kafka;

import com.datatorrent.contrib.hbase.HBaseFieldValueGenerator;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaMetadataUtil.class */
public class KafkaMetadataUtil {
    public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class";
    public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list";
    private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
    private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
    private static final int timeout = 10000;
    private static final int bufferSize = 131072;

    public static List<PartitionMetadata> getPartitionsForTopic(Set<String> set, String str) {
        TopicMetadata topicMetadata = getTopicMetadata(set, str);
        if (topicMetadata == null) {
            return null;
        }
        return topicMetadata.partitionsMetadata();
    }

    public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> setMultimap, final String str) {
        return Maps.transformEntries(setMultimap.asMap(), new Maps.EntryTransformer<String, Collection<String>, List<PartitionMetadata>>() { // from class: com.datatorrent.contrib.kafka.KafkaMetadataUtil.1
            public List<PartitionMetadata> transformEntry(String str2, Collection<String> collection) {
                return KafkaMetadataUtil.getPartitionsForTopic(new HashSet(collection), str);
            }
        });
    }

    public static Set<String> getBrokers(Set<String> set) {
        ZkClient zkClient = new ZkClient(set.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$);
        HashSet hashSet = new HashSet();
        Iterator it = JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkClient)).iterator();
        while (it.hasNext()) {
            hashSet.add(((Broker) it.next()).connectionString());
        }
        zkClient.close();
        return hashSet;
    }

    public static PartitionMetadata getPartitionForTopic(Set<String> set, String str, int i) {
        List<PartitionMetadata> partitionsForTopic = getPartitionsForTopic(set, str);
        if (partitionsForTopic == null) {
            return null;
        }
        for (PartitionMetadata partitionMetadata : partitionsForTopic) {
            if (partitionMetadata.partitionId() == i) {
                return partitionMetadata;
            }
        }
        return null;
    }

    public static TopicMetadata getTopicMetadata(Set<String> set, String str) {
        Iterator it;
        SimpleConsumer simpleConsumer = null;
        if (set == null || set == null || set.size() == 0) {
            return null;
        }
        try {
            for (String str2 : set) {
                logger.debug("Try to get Metadata for topic {} broker {}", str, str2);
                try {
                    simpleConsumer = new SimpleConsumer(str2.split(HBaseFieldValueGenerator.COLON)[0], Integer.parseInt(str2.split(HBaseFieldValueGenerator.COLON)[1]), timeout, bufferSize, mdClientId);
                    ArrayList arrayList = new ArrayList(1);
                    arrayList.add(str);
                    it = simpleConsumer.send(new TopicMetadataRequest(arrayList)).topicsMetadata().iterator();
                } catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Wrong format for broker url, should be \"broker1:port1\"");
                } catch (Exception e2) {
                    logger.warn("Broker {} is unavailable or in bad state!", str2);
                }
                if (it.hasNext()) {
                    TopicMetadata topicMetadata = (TopicMetadata) it.next();
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                    return topicMetadata;
                }
            }
            return null;
        } finally {
            if (simpleConsumer != null) {
                simpleConsumer.close();
            }
        }
    }

    public static long getLastOffset(SimpleConsumer simpleConsumer, String str, int i, long j, String str2) {
        if (simpleConsumer == null) {
            return 0L;
        }
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), str2));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        logger.error("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return 0L;
    }
}
