package com.datatorrent.contrib.kafka;

import com.datatorrent.contrib.hbase.HBaseFieldValueGenerator;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/SimpleKafkaConsumer.class */
public class SimpleKafkaConsumer extends KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private final transient Map<Broker, ConsumerThread> simpleConsumerThreads;
    private transient ExecutorService kafkaConsumerExecutor;
    private transient ScheduledExecutorService metadataRefreshExecutor;
    private final transient AtomicInteger retryCounter;
    private int timeout;
    private int bufferSize;

    @NotNull
    private String clientId;
    private int metadataRefreshInterval;
    private int metadataRefreshRetryLimit;
    private Set<KafkaPartition> kps;
    private final transient ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker;
    private final transient ConcurrentHashMap<KafkaPartition, Long> offsetTrack;
    private transient AtomicReference<Throwable> monitorException;
    private transient AtomicInteger monitorExceptionCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/contrib/kafka/SimpleKafkaConsumer$ConsumerThread.class */
    public static final class ConsumerThread implements Runnable {
        private final Broker broker;
        private final String clientName;
        private SimpleConsumer ksc;
        private SimpleKafkaConsumer consumer;
        private final Set<KafkaPartition> kpS;
        private Future threadItSelf;

        private ConsumerThread(Broker broker, Set<KafkaPartition> set, SimpleKafkaConsumer simpleKafkaConsumer) {
            this.broker = broker;
            this.clientName = simpleKafkaConsumer.getClientName(broker.host() + "_" + broker.port());
            this.consumer = simpleKafkaConsumer;
            this.kpS = Collections.newSetFromMap(new ConcurrentHashMap());
            this.kpS.addAll(set);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    SimpleKafkaConsumer.logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", new Object[]{this.broker, Integer.valueOf(this.consumer.timeout), Integer.valueOf(this.consumer.bufferSize), this.clientName});
                    this.ksc = new SimpleConsumer(this.broker.host(), this.broker.port(), this.consumer.timeout, this.consumer.bufferSize, this.clientName);
                    for (KafkaPartition kafkaPartition : this.kpS) {
                        SimpleKafkaConsumer.logger.info("Start consuming data of topic {} ", kafkaPartition);
                        if (this.consumer.offsetTrack.get(kafkaPartition) != null) {
                            SimpleKafkaConsumer.logger.info("Partition {} initial offset {}", kafkaPartition, this.consumer.offsetTrack.get(kafkaPartition));
                        } else {
                            long EarliestTime = this.consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
                            SimpleKafkaConsumer.logger.info("Partition {} initial offset {} {}", new Object[]{Integer.valueOf(kafkaPartition.getPartitionId()), Long.valueOf(EarliestTime), this.consumer.initialOffset});
                            this.consumer.offsetTrack.put(kafkaPartition, Long.valueOf(KafkaMetadataUtil.getLastOffset(this.ksc, this.consumer.topic, kafkaPartition.getPartitionId(), EarliestTime, this.clientName)));
                        }
                    }
                    while (this.consumer.isAlive && (this.consumer.metadataRefreshRetryLimit == -1 || this.consumer.retryCounter.get() < this.consumer.metadataRefreshRetryLimit)) {
                        if (this.kpS == null || this.kpS.isEmpty()) {
                            if (this.ksc != null) {
                                this.ksc.close();
                            }
                            Iterator<KafkaPartition> it = this.kpS.iterator();
                            while (it.hasNext()) {
                                this.consumer.partitionToBroker.remove(it.next());
                            }
                            SimpleKafkaConsumer.logger.info("Exit the consumer thread for broker {} ", this.broker);
                            return;
                        }
                        FetchRequestBuilder clientId = new FetchRequestBuilder().clientId(this.clientName);
                        for (KafkaPartition kafkaPartition2 : this.kpS) {
                            clientId.addFetch(this.consumer.topic, kafkaPartition2.getPartitionId(), ((Long) this.consumer.offsetTrack.get(kafkaPartition2)).longValue(), this.consumer.bufferSize);
                        }
                        FetchRequest build = clientId.build();
                        if (this.ksc == null) {
                            if (this.consumer.metadataRefreshInterval > 0) {
                                Thread.sleep(this.consumer.metadataRefreshInterval + 1000);
                            } else {
                                Thread.sleep(100L);
                            }
                        }
                        FetchResponse fetch = this.ksc.fetch(build);
                        Iterator<KafkaPartition> it2 = this.kpS.iterator();
                        while (it2.hasNext()) {
                            KafkaPartition next = it2.next();
                            short errorCode = fetch.errorCode(this.consumer.topic, next.getPartitionId());
                            if (!fetch.hasError() || errorCode == ErrorMapping.NoError()) {
                                long j = -1;
                                Iterator it3 = fetch.messageSet(this.consumer.topic, next.getPartitionId()).iterator();
                                while (it3.hasNext()) {
                                    MessageAndOffset messageAndOffset = (MessageAndOffset) it3.next();
                                    j = messageAndOffset.nextOffset();
                                    this.consumer.putMessage(next, messageAndOffset.message(), messageAndOffset.offset());
                                }
                                if (j != -1) {
                                    this.consumer.offsetTrack.put(next, Long.valueOf(j));
                                }
                            } else {
                                SimpleKafkaConsumer.logger.warn("Error when consuming topic {} from broker {} with error {} ", new Object[]{next, this.broker, ErrorMapping.exceptionFor(errorCode)});
                                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                                    long lastOffset = KafkaMetadataUtil.getLastOffset(this.ksc, this.consumer.topic, next.getPartitionId(), this.consumer.initialOffset.toLowerCase().equals("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime(), this.clientName);
                                    SimpleKafkaConsumer.logger.warn("Offset out of range error, reset offset to {}", Long.valueOf(lastOffset));
                                    this.consumer.offsetTrack.put(next, Long.valueOf(lastOffset));
                                } else {
                                    it2.remove();
                                    this.consumer.partitionToBroker.remove(next);
                                    this.consumer.stats.updatePartitionStats(next, -1, "");
                                }
                            }
                        }
                    }
                    if (this.ksc != null) {
                        this.ksc.close();
                    }
                    Iterator<KafkaPartition> it4 = this.kpS.iterator();
                    while (it4.hasNext()) {
                        this.consumer.partitionToBroker.remove(it4.next());
                    }
                    SimpleKafkaConsumer.logger.info("Exit the consumer thread for broker {} ", this.broker);
                } catch (Exception e) {
                    SimpleKafkaConsumer.logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", this.broker, e);
                    if (this.ksc != null) {
                        this.ksc.close();
                    }
                    Iterator<KafkaPartition> it5 = this.kpS.iterator();
                    while (it5.hasNext()) {
                        this.consumer.partitionToBroker.remove(it5.next());
                    }
                    SimpleKafkaConsumer.logger.info("Exit the consumer thread for broker {} ", this.broker);
                }
            } catch (Throwable th) {
                if (this.ksc != null) {
                    this.ksc.close();
                }
                Iterator<KafkaPartition> it6 = this.kpS.iterator();
                while (it6.hasNext()) {
                    this.consumer.partitionToBroker.remove(it6.next());
                }
                SimpleKafkaConsumer.logger.info("Exit the consumer thread for broker {} ", this.broker);
                throw th;
            }
        }

        public void addPartitions(Set<KafkaPartition> set) {
            this.kpS.addAll(set);
        }

        public Future getThreadItSelf() {
            return this.threadItSelf;
        }

        public void setThreadItSelf(Future future) {
            this.threadItSelf = future;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/contrib/kafka/SimpleKafkaConsumer$MetaDataMonitorTask.class */
    public class MetaDataMonitorTask implements Runnable {
        private final SimpleKafkaConsumer ref;
        private final transient SetMultimap<Broker, KafkaPartition> deltaPositive;

        private MetaDataMonitorTask(SimpleKafkaConsumer simpleKafkaConsumer) {
            this.deltaPositive = HashMultimap.create();
            this.ref = simpleKafkaConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                monitorMetadata();
                SimpleKafkaConsumer.this.monitorException.set(null);
                SimpleKafkaConsumer.this.monitorExceptionCount.set(0);
            } catch (Throwable th) {
                SimpleKafkaConsumer.logger.error("Exception {}", th);
                SimpleKafkaConsumer.this.monitorException.set(th);
                SimpleKafkaConsumer.this.monitorExceptionCount.incrementAndGet();
            }
        }

        private void monitorMetadata() {
            if (SimpleKafkaConsumer.this.isAlive) {
                if (SimpleKafkaConsumer.this.metadataRefreshRetryLimit == -1 || SimpleKafkaConsumer.this.retryCounter.get() < SimpleKafkaConsumer.this.metadataRefreshRetryLimit) {
                    SimpleKafkaConsumer.logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), SimpleKafkaConsumer.this.topic);
                    Map<String, List<PartitionMetadata>> partitionsForTopic = KafkaMetadataUtil.getPartitionsForTopic(SimpleKafkaConsumer.this.brokers, SimpleKafkaConsumer.this.topic);
                    if (partitionsForTopic == null) {
                        SimpleKafkaConsumer.this.retryCounter.getAndAdd(1);
                        return;
                    }
                    for (Map.Entry<String, List<PartitionMetadata>> entry : partitionsForTopic.entrySet()) {
                        if (entry.getValue() != null) {
                            for (PartitionMetadata partitionMetadata : entry.getValue()) {
                                KafkaPartition kafkaPartition = new KafkaPartition(entry.getKey(), SimpleKafkaConsumer.this.topic, partitionMetadata.partitionId());
                                if (SimpleKafkaConsumer.this.kps.contains(kafkaPartition)) {
                                    Broker leader = partitionMetadata.leader();
                                    if (leader == null) {
                                        SimpleKafkaConsumer.logger.info("No Leader broker for Kafka Partition {}. Skipping it for time until new leader is elected", Integer.valueOf(kafkaPartition.getPartitionId()));
                                    } else if (!leader.equals((Broker) SimpleKafkaConsumer.this.partitionToBroker.put(kafkaPartition, leader))) {
                                        this.deltaPositive.put(leader, kafkaPartition);
                                        SimpleKafkaConsumer.this.stats.updatePartitionStats(kafkaPartition, partitionMetadata.leader().id(), partitionMetadata.leader().host() + HBaseFieldValueGenerator.COLON + partitionMetadata.leader().port());
                                    }
                                }
                            }
                        }
                    }
                    Iterator it = SimpleKafkaConsumer.this.simpleConsumerThreads.entrySet().iterator();
                    while (it.hasNext()) {
                        if (((ConsumerThread) ((Map.Entry) it.next()).getValue()).getThreadItSelf().isDone()) {
                            it.remove();
                        }
                    }
                    for (Broker broker : this.deltaPositive.keySet()) {
                        if (SimpleKafkaConsumer.this.simpleConsumerThreads.containsKey(broker)) {
                            ((ConsumerThread) SimpleKafkaConsumer.this.simpleConsumerThreads.get(broker)).addPartitions(this.deltaPositive.get(broker));
                        } else {
                            ConsumerThread consumerThread = new ConsumerThread(broker, this.deltaPositive.get(broker), this.ref);
                            consumerThread.setThreadItSelf(SimpleKafkaConsumer.this.kafkaConsumerExecutor.submit(consumerThread));
                            SimpleKafkaConsumer.this.simpleConsumerThreads.put(broker, consumerThread);
                        }
                    }
                    this.deltaPositive.clear();
                    SimpleKafkaConsumer.this.retryCounter.set(0);
                }
            }
        }
    }

    public SimpleKafkaConsumer() {
        this.simpleConsumerThreads = new HashMap();
        this.retryCounter = new AtomicInteger(0);
        this.timeout = 10000;
        this.bufferSize = 1048576;
        this.clientId = "Kafka_Simple_Client";
        this.metadataRefreshInterval = 30000;
        this.metadataRefreshRetryLimit = -1;
        this.kps = new HashSet();
        this.partitionToBroker = new ConcurrentHashMap<>();
        this.offsetTrack = new ConcurrentHashMap<>();
    }

    public SimpleKafkaConsumer(String str, int i, int i2, String str2) {
        this(str, i, i2, str2, null);
    }

    public SimpleKafkaConsumer(String str, int i, int i2, String str2, Set<KafkaPartition> set) {
        super(str);
        this.simpleConsumerThreads = new HashMap();
        this.retryCounter = new AtomicInteger(0);
        this.timeout = 10000;
        this.bufferSize = 1048576;
        this.clientId = "Kafka_Simple_Client";
        this.metadataRefreshInterval = 30000;
        this.metadataRefreshRetryLimit = -1;
        this.kps = new HashSet();
        this.partitionToBroker = new ConcurrentHashMap<>();
        this.offsetTrack = new ConcurrentHashMap<>();
        this.timeout = i;
        this.bufferSize = i2;
        this.clientId = str2;
        this.kps = set;
    }

    public SimpleKafkaConsumer(String str, String str2, int i, int i2, String str3, Set<KafkaPartition> set) {
        super(str, str2);
        this.simpleConsumerThreads = new HashMap();
        this.retryCounter = new AtomicInteger(0);
        this.timeout = 10000;
        this.bufferSize = 1048576;
        this.clientId = "Kafka_Simple_Client";
        this.metadataRefreshInterval = 30000;
        this.metadataRefreshRetryLimit = -1;
        this.kps = new HashSet();
        this.partitionToBroker = new ConcurrentHashMap<>();
        this.offsetTrack = new ConcurrentHashMap<>();
        this.timeout = i;
        this.bufferSize = i2;
        this.clientId = str3;
        this.kps = set;
    }

    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void start() {
        this.monitorException = new AtomicReference<>(null);
        this.monitorExceptionCount = new AtomicInteger(0);
        super.start();
        this.kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + this.topic + "-%d").build());
        if (this.metadataRefreshInterval <= 0 || CollectionUtils.isEmpty(this.kps)) {
            return;
        }
        this.metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + this.topic + "-%d").setDaemon(true).build());
        this.metadataRefreshExecutor.scheduleAtFixedRate(new MetaDataMonitorTask(this), 0L, this.metadataRefreshInterval, TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.info("Stop all consumer threads");
        Iterator<ConsumerThread> it = this.simpleConsumerThreads.values().iterator();
        while (it.hasNext()) {
            it.next().getThreadItSelf().cancel(true);
        }
        this.simpleConsumerThreads.clear();
        this.metadataRefreshExecutor.shutdownNow();
        this.kafkaConsumerExecutor.shutdownNow();
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setTimeout(int i) {
        this.timeout = i;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    public String getClientId() {
        return this.clientId;
    }

    public int getTimeout() {
        return this.timeout;
    }

    public int getMetadataRefreshInterval() {
        return this.metadataRefreshInterval;
    }

    public void setMetadataRefreshInterval(int i) {
        this.metadataRefreshInterval = i;
    }

    public int getMetadataRefreshRetryLimit() {
        return this.metadataRefreshRetryLimit;
    }

    public void setMetadataRefreshRetryLimit(int i) {
        this.metadataRefreshRetryLimit = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void commitOffset() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getClientName(String str) {
        return this.clientId + "_partition_" + str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public Map<KafkaPartition, Long> getCurrentOffsets() {
        return this.offsetTrack;
    }

    public void resetOffset(Map<KafkaPartition, Long> map) {
        if (map == null) {
            return;
        }
        this.offsetTrack.clear();
        for (KafkaPartition kafkaPartition : this.kps) {
            Long l = map.get(kafkaPartition);
            if (l != null) {
                this.offsetTrack.put(kafkaPartition, l);
            }
        }
    }

    public KafkaConsumer.KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> map) {
        this.stats.updateOffsets(map);
        return super.getConsumerStats();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void resetPartitionsAndOffset(Set<KafkaPartition> set, Map<KafkaPartition, Long> map) {
        this.kps = set;
        resetOffset(map);
    }

    protected Throwable getMonitorException() {
        return this.monitorException.get();
    }

    protected int getMonitorExceptionCount() {
        return this.monitorExceptionCount.get();
    }
}
