/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaMetadataUtil;
import com.datatorrent.contrib.kafka.KafkaPartition;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleKafkaConsumer
extends KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();
    private transient ExecutorService kafkaConsumerExecutor;
    private transient ScheduledExecutorService metadataRefreshExecutor;
    private final transient AtomicInteger retryCounter = new AtomicInteger(0);
    private int timeout = 10000;
    private int bufferSize = 0x100000;
    @NotNull
    private String clientId = "Kafka_Simple_Client";
    private int metadataRefreshInterval = 30000;
    private int metadataRefreshRetryLimit = -1;
    private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();
    private final transient ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap();
    private final transient ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap();
    private transient AtomicReference<Throwable> monitorException;
    private transient AtomicInteger monitorExceptionCount;

    public SimpleKafkaConsumer() {
    }

    public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId) {
        this(topic, timeout, bufferSize, clientId, null);
    }

    public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds) {
        super(topic);
        this.timeout = timeout;
        this.bufferSize = bufferSize;
        this.clientId = clientId;
        this.kps = partitionIds;
    }

    public SimpleKafkaConsumer(String zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds) {
        super(zks, topic);
        this.timeout = timeout;
        this.bufferSize = bufferSize;
        this.clientId = clientId;
        this.kps = partitionIds;
    }

    @Override
    public void start() {
        this.monitorException = new AtomicReference<Object>(null);
        this.monitorExceptionCount = new AtomicInteger(0);
        super.start();
        this.kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + this.topic + "-%d").build());
        if (this.metadataRefreshInterval <= 0 || CollectionUtils.isEmpty(this.kps)) {
            return;
        }
        this.metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + this.topic + "-%d").setDaemon(true).build());
        this.metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this), 0L, this.metadataRefreshInterval, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        logger.info("Stop all consumer threads");
        for (ConsumerThread ct : this.simpleConsumerThreads.values()) {
            ct.getThreadItSelf().cancel(true);
        }
        this.simpleConsumerThreads.clear();
        this.metadataRefreshExecutor.shutdownNow();
        this.kafkaConsumerExecutor.shutdownNow();
    }

    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public String getClientId() {
        return this.clientId;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public int getMetadataRefreshInterval() {
        return this.metadataRefreshInterval;
    }

    public void setMetadataRefreshInterval(int reconnectInterval) {
        this.metadataRefreshInterval = reconnectInterval;
    }

    public int getMetadataRefreshRetryLimit() {
        return this.metadataRefreshRetryLimit;
    }

    public void setMetadataRefreshRetryLimit(int metadataRefreshRetryLimit) {
        this.metadataRefreshRetryLimit = metadataRefreshRetryLimit;
    }

    @Override
    protected void commitOffset() {
    }

    private String getClientName(String brokerName) {
        return this.clientId + "_partition_" + brokerName;
    }

    @Override
    protected Map<KafkaPartition, Long> getCurrentOffsets() {
        return this.offsetTrack;
    }

    public void resetOffset(Map<KafkaPartition, Long> overrideOffset) {
        if (overrideOffset == null) {
            return;
        }
        this.offsetTrack.clear();
        for (KafkaPartition kp : this.kps) {
            Long offsetForPar = overrideOffset.get(kp);
            if (offsetForPar == null) continue;
            this.offsetTrack.put(kp, offsetForPar);
        }
    }

    public KafkaConsumer.KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats) {
        this.stats.updateOffsets(offsetStats);
        return super.getConsumerStats();
    }

    @Override
    protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset) {
        this.kps = partitionIds;
        this.resetOffset(startOffset);
    }

    protected Throwable getMonitorException() {
        return this.monitorException.get();
    }

    protected int getMonitorExceptionCount() {
        return this.monitorExceptionCount.get();
    }

    private class MetaDataMonitorTask
    implements Runnable {
        private final SimpleKafkaConsumer ref;
        private final transient SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();

        private MetaDataMonitorTask(SimpleKafkaConsumer ref) {
            this.ref = ref;
        }

        @Override
        public void run() {
            try {
                this.monitorMetadata();
                SimpleKafkaConsumer.this.monitorException.set(null);
                SimpleKafkaConsumer.this.monitorExceptionCount.set(0);
            }
            catch (Throwable ex) {
                logger.error("Exception {}", ex);
                SimpleKafkaConsumer.this.monitorException.set(ex);
                SimpleKafkaConsumer.this.monitorExceptionCount.incrementAndGet();
            }
        }

        private void monitorMetadata() {
            if (SimpleKafkaConsumer.this.isAlive && (SimpleKafkaConsumer.this.metadataRefreshRetryLimit == -1 || SimpleKafkaConsumer.this.retryCounter.get() < SimpleKafkaConsumer.this.metadataRefreshRetryLimit)) {
                logger.debug("{}: Update metadata for topic {}", (Object)Thread.currentThread().getName(), (Object)SimpleKafkaConsumer.this.topic);
                Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic((SetMultimap<String, String>)SimpleKafkaConsumer.this.brokers, SimpleKafkaConsumer.this.topic);
                if (pms == null) {
                    SimpleKafkaConsumer.this.retryCounter.getAndAdd(1);
                    return;
                }
                for (Map.Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
                    if (pmLEntry.getValue() == null) continue;
                    for (PartitionMetadata pm : pmLEntry.getValue()) {
                        KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), SimpleKafkaConsumer.this.topic, pm.partitionId());
                        if (!SimpleKafkaConsumer.this.kps.contains(kp)) continue;
                        Broker b = pm.leader();
                        if (b == null) {
                            logger.info("No Leader broker for Kafka Partition {}. Skipping it for time until new leader is elected", (Object)kp.getPartitionId());
                            continue;
                        }
                        Broker oldB = SimpleKafkaConsumer.this.partitionToBroker.put(kp, b);
                        if (b.equals((Object)oldB)) continue;
                        this.deltaPositive.put((Object)b, (Object)kp);
                        SimpleKafkaConsumer.this.stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
                    }
                }
                Iterator iterator = SimpleKafkaConsumer.this.simpleConsumerThreads.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry item = iterator.next();
                    if (!((ConsumerThread)item.getValue()).getThreadItSelf().isDone()) continue;
                    iterator.remove();
                }
                for (Broker b : this.deltaPositive.keySet()) {
                    if (!SimpleKafkaConsumer.this.simpleConsumerThreads.containsKey(b)) {
                        ConsumerThread ct = new ConsumerThread(b, this.deltaPositive.get((Object)b), this.ref);
                        ct.setThreadItSelf(SimpleKafkaConsumer.this.kafkaConsumerExecutor.submit(ct));
                        SimpleKafkaConsumer.this.simpleConsumerThreads.put(b, ct);
                        continue;
                    }
                    ((ConsumerThread)SimpleKafkaConsumer.this.simpleConsumerThreads.get(b)).addPartitions(this.deltaPositive.get((Object)b));
                }
                this.deltaPositive.clear();
                SimpleKafkaConsumer.this.retryCounter.set(0);
            }
        }
    }

    static final class ConsumerThread
    implements Runnable {
        private final Broker broker;
        private final String clientName;
        private SimpleConsumer ksc;
        private SimpleKafkaConsumer consumer;
        private final Set<KafkaPartition> kpS;
        private Future threadItSelf;

        private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer) {
            this.broker = broker;
            this.clientName = consumer.getClientName(broker.host() + "_" + broker.port());
            this.consumer = consumer;
            this.kpS = Collections.newSetFromMap(new ConcurrentHashMap());
            this.kpS.addAll(kpl);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", new Object[]{this.broker, this.consumer.timeout, this.consumer.bufferSize, this.clientName});
                this.ksc = new SimpleConsumer(this.broker.host(), this.broker.port(), this.consumer.timeout, this.consumer.bufferSize, this.clientName);
                for (KafkaPartition kpForConsumer : this.kpS) {
                    logger.info("Start consuming data of topic {} ", (Object)kpForConsumer);
                    if (this.consumer.offsetTrack.get(kpForConsumer) != null) {
                        logger.info("Partition {} initial offset {}", (Object)kpForConsumer, this.consumer.offsetTrack.get(kpForConsumer));
                        continue;
                    }
                    long startOffsetReq = this.consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
                    logger.info("Partition {} initial offset {} {}", new Object[]{kpForConsumer.getPartitionId(), startOffsetReq, this.consumer.initialOffset});
                    this.consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(this.ksc, this.consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, this.clientName));
                }
                while (this.consumer.isAlive && (this.consumer.metadataRefreshRetryLimit == -1 || this.consumer.retryCounter.get() < this.consumer.metadataRefreshRetryLimit)) {
                    if (this.kpS == null || this.kpS.isEmpty()) {
                        return;
                    }
                    FetchRequestBuilder frb = new FetchRequestBuilder().clientId(this.clientName);
                    for (KafkaPartition kpForConsumer : this.kpS) {
                        frb.addFetch(this.consumer.topic, kpForConsumer.getPartitionId(), ((Long)this.consumer.offsetTrack.get(kpForConsumer)).longValue(), this.consumer.bufferSize);
                    }
                    FetchRequest req = frb.build();
                    if (this.ksc == null) {
                        if (this.consumer.metadataRefreshInterval > 0) {
                            Thread.sleep(this.consumer.metadataRefreshInterval + 1000);
                        } else {
                            Thread.sleep(100L);
                        }
                    }
                    FetchResponse fetchResponse = this.ksc.fetch(req);
                    Iterator<KafkaPartition> iterator = this.kpS.iterator();
                    while (iterator.hasNext()) {
                        KafkaPartition kafkaPartition = iterator.next();
                        short errorCode = fetchResponse.errorCode(this.consumer.topic, kafkaPartition.getPartitionId());
                        if (fetchResponse.hasError() && errorCode != ErrorMapping.NoError()) {
                            logger.warn("Error when consuming topic {} from broker {} with error {} ", new Object[]{kafkaPartition, this.broker, ErrorMapping.exceptionFor((short)errorCode)});
                            if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                                long seekTo = this.consumer.initialOffset.toLowerCase().equals("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
                                seekTo = KafkaMetadataUtil.getLastOffset(this.ksc, this.consumer.topic, kafkaPartition.getPartitionId(), seekTo, this.clientName);
                                logger.warn("Offset out of range error, reset offset to {}", (Object)seekTo);
                                this.consumer.offsetTrack.put(kafkaPartition, seekTo);
                                continue;
                            }
                            iterator.remove();
                            this.consumer.partitionToBroker.remove(kafkaPartition);
                            this.consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
                            continue;
                        }
                        long offset = -1L;
                        for (MessageAndOffset msg : fetchResponse.messageSet(this.consumer.topic, kafkaPartition.getPartitionId())) {
                            offset = msg.nextOffset();
                            this.consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
                        }
                        if (offset == -1L) continue;
                        this.consumer.offsetTrack.put(kafkaPartition, offset);
                    }
                }
            }
            catch (Exception e) {
                logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", (Object)this.broker, (Object)e);
            }
            finally {
                if (this.ksc != null) {
                    this.ksc.close();
                }
                for (KafkaPartition kpForConsumer : this.kpS) {
                    this.consumer.partitionToBroker.remove(kpForConsumer);
                }
                logger.info("Exit the consumer thread for broker {} ", (Object)this.broker);
            }
        }

        public void addPartitions(Set<KafkaPartition> newKps) {
            this.kpS.addAll(newKps);
        }

        public Future getThreadItSelf() {
            return this.threadItSelf;
        }

        public void setThreadItSelf(Future threadItSelf) {
            this.threadItSelf = threadItSelf;
        }
    }
}

