/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaMetadataUtil;
import com.datatorrent.contrib.kafka.KafkaPartition;
import com.google.common.collect.SetMultimap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HighlevelKafkaConsumer
extends KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(HighlevelKafkaConsumer.class);
    private Properties consumerConfig = null;
    private transient Map<String, ConsumerConnector> standardConsumer = null;
    private transient ExecutorService consumerThreadExecutor = null;
    private Map<String, Integer> numStream;

    public HighlevelKafkaConsumer() {
        this.numStream = new HashMap<String, Integer>();
    }

    public HighlevelKafkaConsumer(Properties consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void create() {
        super.create();
        if (this.standardConsumer == null) {
            this.standardConsumer = new HashMap<String, ConsumerConnector>();
        }
        this.consumerConfig.put("consumer.id", "consumer" + System.currentTimeMillis());
        if (this.initialOffset.equalsIgnoreCase("earliest")) {
            this.consumerConfig.put("auto.offset.reset", "smallest");
        } else {
            this.consumerConfig.put("auto.offset.reset", "largest");
        }
    }

    @Override
    public void start() {
        super.start();
        for (String cluster : this.zookeeperMap.keySet()) {
            Properties config = new Properties();
            config.putAll((Map<?, ?>)this.consumerConfig);
            config.setProperty("zookeeper.connect", (String)this.zookeeperMap.get((Object)cluster).iterator().next());
            this.standardConsumer.put(cluster, Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(config)));
        }
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        if (this.numStream == null || this.numStream.size() == 0) {
            if (this.numStream == null) {
                this.numStream = new HashMap<String, Integer>();
            }
            for (Map.Entry<String, List<PartitionMetadata>> e : KafkaMetadataUtil.getPartitionsForTopic((SetMultimap<String, String>)this.brokers, this.topic).entrySet()) {
                this.numStream.put(e.getKey(), e.getValue().size());
            }
        }
        int totalNumStream = 0;
        Iterator<Object> i$ = this.numStream.values().iterator();
        while (i$.hasNext()) {
            int n = i$.next();
            totalNumStream += n;
        }
        if (totalNumStream <= 0) {
            logger.warn("No more job needed to consume data ");
            return;
        }
        this.consumerThreadExecutor = Executors.newFixedThreadPool(totalNumStream);
        for (final Map.Entry entry : this.numStream.entrySet()) {
            int realNumStream = (Integer)entry.getValue();
            topicCountMap.put(this.topic, new Integer(realNumStream));
            Map consumerMap = this.standardConsumer.get(entry.getKey()).createMessageStreams(topicCountMap);
            for (final KafkaStream stream : (List)consumerMap.get(this.topic)) {
                this.consumerThreadExecutor.submit(new Runnable(){
                    KafkaPartition kp;
                    {
                        this.kp = new KafkaPartition((String)entry.getKey(), HighlevelKafkaConsumer.this.topic, -1);
                    }

                    @Override
                    public void run() {
                        ConsumerIterator itr = stream.iterator();
                        logger.debug("Thread {} starts consuming message...", (Object)Thread.currentThread().getName());
                        while (itr.hasNext() && HighlevelKafkaConsumer.this.isAlive) {
                            MessageAndMetadata mam = itr.next();
                            try {
                                this.kp.setPartitionId(mam.partition());
                                HighlevelKafkaConsumer.this.putMessage(this.kp, new Message((byte[])mam.message()), mam.offset());
                            }
                            catch (InterruptedException e) {
                                logger.error("Message Enqueue has been interrupted", (Throwable)e);
                            }
                        }
                        logger.debug("Thread {} stops consuming message...", (Object)Thread.currentThread().getName());
                    }
                });
            }
        }
    }

    @Override
    public void close() {
        if (this.standardConsumer != null && this.standardConsumer.values() != null) {
            for (ConsumerConnector consumerConnector : this.standardConsumer.values()) {
                consumerConnector.shutdown();
            }
        }
        if (this.consumerThreadExecutor != null) {
            this.consumerThreadExecutor.shutdown();
        }
    }

    public void setConsumerConfig(Properties consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    @Override
    protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset) {
        this.numStream = new HashMap<String, Integer>();
        for (KafkaPartition kafkaPartition : partitionIds) {
            if (this.numStream.get(kafkaPartition.getClusterId()) == null) {
                this.numStream.put(kafkaPartition.getClusterId(), 0);
            }
            this.numStream.put(kafkaPartition.getClusterId(), this.numStream.get(kafkaPartition.getClusterId()) + 1);
        }
    }

    @Override
    protected void commitOffset() {
        if (this.standardConsumer != null && this.standardConsumer.values() != null) {
            for (ConsumerConnector consumerConnector : this.standardConsumer.values()) {
                consumerConnector.commitOffsets();
            }
        }
    }

    @Override
    protected Map<KafkaPartition, Long> getCurrentOffsets() {
        throw new UnsupportedOperationException("Offset request is currently not supported for high-level consumer");
    }
}

