package org.apache.apex.malhar.kafka;

import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaPartitioner;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaConsumerWrapper.class */
public class KafkaConsumerWrapper implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
    private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> holdingBuffer;
    private ExecutorService kafkaConsumerExecutor;
    private boolean isAlive = false;
    private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new HashMap();
    private AbstractKafkaInputOperator ownerOperator = null;
    private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new HashMap();
    private boolean waitForReplay = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaConsumerWrapper$ConsumerThread.class */
    public static final class ConsumerThread implements Runnable {
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final String cluster;
        private final KafkaConsumerWrapper wrapper;
        private Map<TopicPartition, OffsetAndMetadata> offsetToCommit;

        public ConsumerThread(String str, KafkaConsumer<byte[], byte[]> kafkaConsumer, KafkaConsumerWrapper kafkaConsumerWrapper) {
            this.offsetToCommit = null;
            this.cluster = str;
            this.consumer = kafkaConsumer;
            this.wrapper = kafkaConsumerWrapper;
            this.offsetToCommit = new ConcurrentHashMap();
            kafkaConsumerWrapper.offsetsToCommit.put(str, this.offsetToCommit);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.wrapper.isAlive) {
                try {
                    try {
                        if (this.wrapper.waitForReplay) {
                            Thread.sleep(100L);
                        } else {
                            if (!this.offsetToCommit.isEmpty()) {
                                if (KafkaConsumerWrapper.logger.isDebugEnabled()) {
                                    KafkaConsumerWrapper.logger.debug("Commit offsets {}", Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit));
                                }
                                this.consumer.commitAsync(this.offsetToCommit, this.wrapper.ownerOperator);
                                this.offsetToCommit.clear();
                            }
                            try {
                                Iterator it = this.consumer.poll(this.wrapper.ownerOperator.getConsumerTimeout()).iterator();
                                while (it.hasNext()) {
                                    this.wrapper.putMessage(Pair.of(this.cluster, (ConsumerRecord) it.next()));
                                }
                            } catch (NoOffsetForPartitionException e) {
                                AbstractKafkaInputOperator.InitialOffset valueOf = AbstractKafkaInputOperator.InitialOffset.valueOf(this.wrapper.ownerOperator.getInitialOffset());
                                if (valueOf == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || valueOf == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
                                    this.consumer.seekToBeginning((TopicPartition[]) e.partitions().toArray(new TopicPartition[0]));
                                } else {
                                    this.consumer.seekToEnd((TopicPartition[]) e.partitions().toArray(new TopicPartition[0]));
                                }
                            } catch (InterruptedException e2) {
                                throw new IllegalStateException("Consumer thread is interrupted unexpectedly", e2);
                            }
                        }
                    } catch (InterruptedException e3) {
                        DTThrowable.rethrow(e3);
                        this.consumer.close();
                        return;
                    } catch (WakeupException e4) {
                        KafkaConsumerWrapper.logger.info("The consumer is being stopped");
                        this.consumer.close();
                        return;
                    }
                } catch (Throwable th) {
                    this.consumer.close();
                    throw th;
                }
            }
            this.consumer.close();
        }
    }

    public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> map) {
        if (map == null) {
            return;
        }
        for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> entry : map.entrySet()) {
            Map<TopicPartition, OffsetAndMetadata> map2 = this.offsetsToCommit.get(entry.getKey().getCluster());
            if (map2 == null) {
                logger.warn("committed offset map should be initialized by consumer thread!");
            } else {
                map2.put(entry.getKey().getTopicPartition(), new OffsetAndMetadata(entry.getValue().longValue()));
            }
        }
    }

    public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> map) {
        for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> entry : map.entrySet()) {
            AbstractKafkaPartitioner.PartitionMeta key = entry.getKey();
            Pair<Long, Long> value = entry.getValue();
            KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumers.get(key.getCluster());
            if (kafkaConsumer == null && kafkaConsumer.assignment().contains(entry.getKey().getTopicPartition())) {
                throw new RuntimeException("Coundn't find consumer to replay the message PartitionMeta : " + key);
            }
            for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
                if (key.getTopicPartition().equals(topicPartition)) {
                    kafkaConsumer.resume(new TopicPartition[]{topicPartition});
                } else {
                    kafkaConsumer.pause(new TopicPartition[]{topicPartition});
                }
            }
            kafkaConsumer.seek(key.getTopicPartition(), ((Long) value.getLeft()).longValue());
            long longValue = ((Long) value.getRight()).longValue();
            while (longValue > 0) {
                try {
                    Iterator it = kafkaConsumer.poll(this.ownerOperator.getConsumerTimeout()).iterator();
                    while (it.hasNext() && longValue > 0) {
                        this.ownerOperator.emitTuple(key.getCluster(), (ConsumerRecord) it.next());
                        longValue--;
                    }
                } catch (NoOffsetForPartitionException e) {
                    throw new RuntimeException("Couldn't replay the offset", e);
                }
            }
            kafkaConsumer.seek(key.getTopicPartition(), ((Long) value.getLeft()).longValue() + ((Long) value.getRight()).longValue());
        }
        for (KafkaConsumer<byte[], byte[]> kafkaConsumer2 : this.consumers.values()) {
            kafkaConsumer2.resume((TopicPartition[]) Iterables.toArray(kafkaConsumer2.assignment(), TopicPartition.class));
        }
    }

    public void afterReplay() {
        this.waitForReplay = false;
    }

    public void create(AbstractKafkaInputOperator abstractKafkaInputOperator) {
        this.holdingBuffer = new ArrayBlockingQueue<>(abstractKafkaInputOperator.getHoldingBufferSize());
        this.ownerOperator = abstractKafkaInputOperator;
        logger.info("Create consumer wrapper with holding buffer size: {} ", Integer.valueOf(abstractKafkaInputOperator.getHoldingBufferSize()));
        if (logger.isInfoEnabled()) {
            logger.info("Assignments are {} ", Joiner.on('\n').join(abstractKafkaInputOperator.assignment()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void start(boolean z) {
        this.waitForReplay = z;
        this.isAlive = true;
        this.kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
        HashMap hashMap = new HashMap();
        for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : this.ownerOperator.assignment()) {
            String cluster = partitionMeta.getCluster();
            List list = (List) hashMap.get(cluster);
            if (list == null) {
                list = new LinkedList();
                hashMap.put(cluster, list);
            }
            list.add(new TopicPartition(partitionMeta.getTopic(), partitionMeta.getPartitionId()));
        }
        Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = this.ownerOperator.getOffsetTrack();
        for (Map.Entry entry : hashMap.entrySet()) {
            Properties properties = new Properties();
            if (this.ownerOperator.getConsumerProps() != null) {
                properties.putAll(this.ownerOperator.getConsumerProps());
            }
            properties.put("bootstrap.servers", entry.getKey());
            properties.put("auto.offset.reset", "none");
            properties.put("enable.auto.commit", "false");
            properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
            properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
            AbstractKafkaInputOperator.InitialOffset valueOf = AbstractKafkaInputOperator.InitialOffset.valueOf(this.ownerOperator.getInitialOffset());
            if (valueOf == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || valueOf == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
                properties.put("group.id", this.ownerOperator.getApplicationName() + "_Consumer");
            }
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            kafkaConsumer.assign((List) entry.getValue());
            if (logger.isInfoEnabled()) {
                logger.info("Create consumer with properties {} ", Joiner.on(";").withKeyValueSeparator("=").join(properties));
                logger.info("Assign consumer to {}", Joiner.on('#').join((Iterable) entry.getValue()));
            }
            if (offsetTrack != null && !offsetTrack.isEmpty()) {
                for (TopicPartition topicPartition : (List) entry.getValue()) {
                    AbstractKafkaPartitioner.PartitionMeta partitionMeta2 = new AbstractKafkaPartitioner.PartitionMeta((String) entry.getKey(), topicPartition.topic(), topicPartition.partition());
                    if (offsetTrack.containsKey(partitionMeta2)) {
                        kafkaConsumer.seek(topicPartition, offsetTrack.get(partitionMeta2).longValue());
                    }
                }
            }
            this.consumers.put(entry.getKey(), kafkaConsumer);
            this.kafkaConsumerExecutor.submit(new ConsumerThread((String) entry.getKey(), kafkaConsumer, this));
        }
    }

    public void stop() {
        Iterator<KafkaConsumer<byte[], byte[]>> it = this.consumers.values().iterator();
        while (it.hasNext()) {
            it.next().wakeup();
        }
        this.kafkaConsumerExecutor.shutdownNow();
        this.isAlive = false;
        this.holdingBuffer.clear();
        IOUtils.closeQuietly(this);
    }

    public void teardown() {
        this.holdingBuffer.clear();
    }

    public boolean isAlive() {
        return this.isAlive;
    }

    public void setAlive(boolean z) {
        this.isAlive = z;
    }

    public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage() {
        return this.holdingBuffer.poll();
    }

    public int messageSize() {
        return this.holdingBuffer.size();
    }

    protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> pair) throws InterruptedException {
        this.holdingBuffer.put(pair);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, KafkaConsumer<byte[], byte[]>> entry : this.consumers.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().metrics());
        }
        return hashMap;
    }
}
