/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterable;
import scala.collection.JavaConversions;

public class KafkaMetadataUtil {
    public static final String PRODUCER_PROP_PARTITIONER = "partitioner.class";
    public static final String PRODUCER_PROP_BROKERLIST = "metadata.broker.list";
    private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
    private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
    private static final int timeout = 10000;
    private static final int bufferSize = 131072;

    public static List<PartitionMetadata> getPartitionsForTopic(Set<String> brokerList, String topic) {
        TopicMetadata tmd = KafkaMetadataUtil.getTopicMetadata(brokerList, topic);
        if (tmd == null) {
            return null;
        }
        return tmd.partitionsMetadata();
    }

    public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic) {
        return Maps.transformEntries((Map)brokers.asMap(), (Maps.EntryTransformer)new Maps.EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){

            public List<PartitionMetadata> transformEntry(String key, Collection<String> bs) {
                return KafkaMetadataUtil.getPartitionsForTopic(new HashSet<String>(bs), topic);
            }
        });
    }

    public static Set<String> getBrokers(Set<String> zkHost) {
        ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
        HashSet<String> brokerHosts = new HashSet<String>();
        for (Broker b : JavaConversions.asJavaIterable((Iterable)ZkUtils.getAllBrokersInCluster((ZkClient)zkclient))) {
            brokerHosts.add(b.connectionString());
        }
        zkclient.close();
        return brokerHosts;
    }

    public static PartitionMetadata getPartitionForTopic(Set<String> brokerList, String topic, int partition) {
        List<PartitionMetadata> pmds = KafkaMetadataUtil.getPartitionsForTopic(brokerList, topic);
        if (pmds == null) {
            return null;
        }
        for (PartitionMetadata pmd : pmds) {
            if (pmd.partitionId() != partition) continue;
            return pmd;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static TopicMetadata getTopicMetadata(Set<String> brokerSet, String topic) {
        SimpleConsumer mdConsumer = null;
        if (brokerSet == null || brokerSet == null || brokerSet.size() == 0) {
            return null;
        }
        try {
            for (String broker : brokerSet) {
                logger.debug("Try to get Metadata for topic {} broker {}", (Object)topic, (Object)broker);
                try {
                    TopicMetadata item;
                    mdConsumer = new SimpleConsumer(broker.split(":")[0], Integer.parseInt(broker.split(":")[1]), 10000, 131072, mdClientId);
                    ArrayList<String> topics = new ArrayList<String>(1);
                    topics.add(topic);
                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    TopicMetadataResponse resp = mdConsumer.send(req);
                    List metaData = resp.topicsMetadata();
                    Iterator i$ = metaData.iterator();
                    if (!i$.hasNext()) continue;
                    TopicMetadata topicMetadata = item = (TopicMetadata)i$.next();
                    return topicMetadata;
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Wrong format for broker url, should be \"broker1:port1\"");
                }
                catch (Exception e) {
                    logger.warn("Broker {} is unavailable or in bad state!", (Object)broker);
                }
            }
            TopicMetadata topicMetadata = null;
            return topicMetadata;
        }
        finally {
            if (mdConsumer != null) {
                mdConsumer.close();
            }
        }
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        if (consumer == null) {
            return 0L;
        }
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0L;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
}

