package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import java.io.Closeable;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import kafka.message.Message;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer.class */
public abstract class KafkaConsumer implements Closeable {
    protected static final String HIGHLEVEL_CONSUMER_ID_SUFFIX = "_stream_";
    protected static final String SIMPLE_CONSUMER_ID_SUFFIX = "_partition_";
    private String zookeeper;
    private int cacheSize;
    protected transient boolean isAlive;
    private transient ArrayBlockingQueue<KafkaMessage> holdingBuffer;

    @NotNull
    protected String topic;

    @NotNull
    @FieldSerializer.Bind(JavaSerializer.class)
    SetMultimap<String, String> zookeeperMap;
    protected transient SetMultimap<String, String> brokers;

    @Pattern(flags = {Pattern.Flag.CASE_INSENSITIVE}, regexp = "earliest|latest")
    protected String initialOffset;
    protected transient SnapShot statsSnapShot;
    protected transient KafkaMeterStats stats;

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$KafkaMessage.class */
    public static class KafkaMessage {
        KafkaPartition kafkaPart;
        Message msg;
        long offSet;

        public KafkaMessage(KafkaPartition kafkaPartition, Message message, long j) {
            this.kafkaPart = kafkaPartition;
            this.msg = message;
            this.offSet = j;
        }

        public KafkaPartition getKafkaPart() {
            return this.kafkaPart;
        }

        public Message getMsg() {
            return this.msg;
        }

        public long getOffSet() {
            return this.offSet;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$KafkaMeterStats.class */
    public static class KafkaMeterStats implements Serializable {
        private static final long serialVersionUID = -2867402654990209006L;
        public ConcurrentHashMap<KafkaPartition, PartitionStats> partitionStats = new ConcurrentHashMap<>();
        public long totalMsgPerSec;
        public long totalBytesPerSec;

        public void set_1minMovingAvgPerPartition(KafkaPartition kafkaPartition, long[] jArr) {
            PartitionStats putPartitionStatsIfNotPresent = putPartitionStatsIfNotPresent(kafkaPartition);
            putPartitionStatsIfNotPresent.msgsPerSec = jArr[0];
            putPartitionStatsIfNotPresent.bytesPerSec = jArr[1];
        }

        public void set_1minMovingAvg(long[] jArr) {
            this.totalMsgPerSec = jArr[0];
            this.totalBytesPerSec = jArr[1];
        }

        public void updateOffsets(Map<KafkaPartition, Long> map) {
            for (Map.Entry<KafkaPartition, Long> entry : map.entrySet()) {
                putPartitionStatsIfNotPresent(entry.getKey()).offset = entry.getValue().longValue();
            }
        }

        public int getConnections() {
            int i = 0;
            Iterator<PartitionStats> it = this.partitionStats.values().iterator();
            while (it.hasNext()) {
                if (!StringUtils.isEmpty(it.next().brokerHost)) {
                    i++;
                }
            }
            return i;
        }

        public void updatePartitionStats(KafkaPartition kafkaPartition, int i, String str) {
            PartitionStats putPartitionStatsIfNotPresent = putPartitionStatsIfNotPresent(kafkaPartition);
            putPartitionStatsIfNotPresent.brokerHost = str;
            putPartitionStatsIfNotPresent.brokerId = i;
        }

        private synchronized PartitionStats putPartitionStatsIfNotPresent(KafkaPartition kafkaPartition) {
            PartitionStats partitionStats = this.partitionStats.get(kafkaPartition);
            if (partitionStats == null) {
                partitionStats = new PartitionStats();
                this.partitionStats.put(kafkaPartition, partitionStats);
            }
            return partitionStats;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$KafkaMeterStatsAggregator.class */
    public static class KafkaMeterStatsAggregator implements Context.CountersAggregator, Serializable {
        private static final long serialVersionUID = 729987800215151678L;

        public Object aggregate(Collection<?> collection) {
            KafkaMeterStats kafkaMeterStats = new KafkaMeterStats();
            for (Object obj : collection) {
                if (obj instanceof KafkaMeterStats) {
                    KafkaMeterStats kafkaMeterStats2 = (KafkaMeterStats) obj;
                    kafkaMeterStats.partitionStats.putAll(kafkaMeterStats2.partitionStats);
                    kafkaMeterStats.totalBytesPerSec += kafkaMeterStats2.totalBytesPerSec;
                    kafkaMeterStats.totalMsgPerSec += kafkaMeterStats2.totalMsgPerSec;
                }
            }
            return kafkaMeterStats;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$KafkaMeterStatsUtil.class */
    public static class KafkaMeterStatsUtil {
        public static Map<KafkaPartition, Long> getOffsetsForPartitions(List<KafkaMeterStats> list) {
            HashMap newHashMap = Maps.newHashMap();
            Iterator<KafkaMeterStats> it = list.iterator();
            while (it.hasNext()) {
                for (Map.Entry<KafkaPartition, PartitionStats> entry : it.next().partitionStats.entrySet()) {
                    newHashMap.put(entry.getKey(), Long.valueOf(entry.getValue().offset));
                }
            }
            return newHashMap;
        }

        public static Map<KafkaPartition, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats) {
            HashMap newHashMap = Maps.newHashMap();
            for (Map.Entry<KafkaPartition, PartitionStats> entry : kafkaMeterStats.partitionStats.entrySet()) {
                newHashMap.put(entry.getKey(), new long[]{entry.getValue().msgsPerSec, entry.getValue().bytesPerSec});
            }
            return newHashMap;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$PartitionStats.class */
    public static class PartitionStats implements Serializable {
        private static final long serialVersionUID = -6572690643487689766L;
        public long msgsPerSec;
        public long bytesPerSec;
        public long offset;
        public int brokerId = -1;
        public String brokerHost = "";
    }

    /* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaConsumer$SnapShot.class */
    static class SnapShot {
        private static int cursor = 0;
        private ScheduledExecutorService service;
        private final Map<KafkaPartition, long[]> _1_min_msg_sum_par = new HashMap();
        private final Map<KafkaPartition, long[]> _1_min_byte_sum_par = new HashMap();
        private final long[] msgSec = new long[61];
        private final long[] bytesSec = new long[61];
        private short last = 1;

        SnapShot() {
        }

        public synchronized void moveNext() {
            cursor = (cursor + 1) % 60;
            long[] jArr = this.msgSec;
            jArr[60] = jArr[60] - this.msgSec[cursor];
            long[] jArr2 = this.bytesSec;
            jArr2[60] = jArr2[60] - this.bytesSec[cursor];
            this.msgSec[cursor] = 0;
            this.bytesSec[cursor] = 0;
            for (Map.Entry<KafkaPartition, long[]> entry : this._1_min_msg_sum_par.entrySet()) {
                long[] value = entry.getValue();
                long[] jArr3 = this._1_min_byte_sum_par.get(entry.getKey());
                value[60] = value[60] - value[cursor];
                jArr3[60] = jArr3[60] - jArr3[cursor];
                value[cursor] = 0;
                jArr3[cursor] = 0;
            }
        }

        public void start() {
            if (this.service == null) {
                this.service = Executors.newScheduledThreadPool(1);
            }
            this.service.scheduleAtFixedRate(new Runnable() { // from class: com.datatorrent.contrib.kafka.KafkaConsumer.SnapShot.1
                @Override // java.lang.Runnable
                public void run() {
                    SnapShot.this.moveNext();
                    if (SnapShot.this.last < 60) {
                        SnapShot.access$008(SnapShot.this);
                    }
                }
            }, 1L, 1L, TimeUnit.SECONDS);
        }

        public void stop() {
            if (this.service != null) {
                this.service.shutdown();
            }
        }

        public synchronized void mark(KafkaPartition kafkaPartition, long j) {
            long[] jArr = this.msgSec;
            int i = cursor;
            jArr[i] = jArr[i] + 1;
            long[] jArr2 = this.msgSec;
            jArr2[60] = jArr2[60] + 1;
            long[] jArr3 = this.bytesSec;
            int i2 = cursor;
            jArr3[i2] = jArr3[i2] + j;
            long[] jArr4 = this.bytesSec;
            jArr4[60] = jArr4[60] + j;
            long[] jArr5 = this._1_min_msg_sum_par.get(kafkaPartition);
            long[] jArr6 = this._1_min_byte_sum_par.get(kafkaPartition);
            if (jArr5 == null) {
                jArr5 = new long[61];
                jArr6 = new long[61];
                this._1_min_msg_sum_par.put(kafkaPartition, jArr5);
                this._1_min_byte_sum_par.put(kafkaPartition, jArr6);
            }
            long[] jArr7 = jArr5;
            int i3 = cursor;
            jArr7[i3] = jArr7[i3] + 1;
            long[] jArr8 = jArr5;
            jArr8[60] = jArr8[60] + 1;
            long[] jArr9 = jArr6;
            int i4 = cursor;
            jArr9[i4] = jArr9[i4] + j;
            long[] jArr10 = jArr6;
            jArr10[60] = jArr10[60] + j;
        }

        public synchronized void setupStats(KafkaMeterStats kafkaMeterStats) {
            long[] jArr = {this.msgSec[60] / this.last, this.bytesSec[60] / this.last};
            for (Map.Entry<KafkaPartition, long[]> entry : this._1_min_msg_sum_par.entrySet()) {
                kafkaMeterStats.set_1minMovingAvgPerPartition(entry.getKey(), new long[]{entry.getValue()[60] / this.last, this._1_min_byte_sum_par.get(entry.getKey())[60] / this.last});
            }
            kafkaMeterStats.set_1minMovingAvg(jArr);
        }

        static /* synthetic */ short access$008(SnapShot snapShot) {
            short s = snapShot.last;
            snapShot.last = (short) (s + 1);
            return s;
        }
    }

    public KafkaConsumer() {
        this.cacheSize = 1024;
        this.isAlive = false;
        this.topic = "default_topic";
        this.initialOffset = "latest";
        this.statsSnapShot = new SnapShot();
        this.stats = new KafkaMeterStats();
    }

    public KafkaConsumer(String str) {
        this();
        this.topic = str;
    }

    public KafkaConsumer(String str, String str2) {
        this.cacheSize = 1024;
        this.isAlive = false;
        this.topic = "default_topic";
        this.initialOffset = "latest";
        this.statsSnapShot = new SnapShot();
        this.stats = new KafkaMeterStats();
        this.topic = str2;
        setZookeeper(str);
    }

    public void create() {
        initBrokers();
        this.holdingBuffer = new ArrayBlockingQueue<>(this.cacheSize);
    }

    public void initBrokers() {
        if (this.brokers == null && this.zookeeperMap != null) {
            this.brokers = HashMultimap.create();
            for (String str : this.zookeeperMap.keySet()) {
                try {
                    this.brokers.putAll(str, KafkaMetadataUtil.getBrokers(this.zookeeperMap.get(str)));
                } catch (Exception e) {
                    throw new RuntimeException("Error resolving brokers for cluster " + str + " " + this.zookeeperMap.get(str), e);
                }
            }
        }
    }

    public void start() {
        this.isAlive = true;
        this.statsSnapShot.start();
    }

    public void stop() {
        this.isAlive = false;
        this.statsSnapShot.stop();
        this.holdingBuffer.clear();
        IOUtils.closeQuietly(this);
    }

    public void teardown() {
        this.holdingBuffer.clear();
    }

    public boolean isAlive() {
        return this.isAlive;
    }

    public void setAlive(boolean z) {
        this.isAlive = z;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public KafkaMessage pollMessage() {
        return this.holdingBuffer.poll();
    }

    public int messageSize() {
        return this.holdingBuffer.size();
    }

    public void setInitialOffset(String str) {
        this.initialOffset = str;
    }

    public String getInitialOffset() {
        return this.initialOffset;
    }

    public int getCacheSize() {
        return this.cacheSize;
    }

    public void setCacheSize(int i) {
        this.cacheSize = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void putMessage(KafkaPartition kafkaPartition, Message message, long j) throws InterruptedException {
        this.holdingBuffer.put(new KafkaMessage(kafkaPartition, message, j));
        this.statsSnapShot.mark(kafkaPartition, message.payloadSize());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void commitOffset();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Map<KafkaPartition, Long> getCurrentOffsets();

    public KafkaMeterStats getConsumerStats() {
        this.statsSnapShot.setupStats(this.stats);
        return this.stats;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void resetPartitionsAndOffset(Set<KafkaPartition> set, Map<KafkaPartition, Long> map);

    public void setZookeeper(String str) {
        this.zookeeper = str;
        this.zookeeperMap = parseZookeeperStr(str);
    }

    public String getZookeeper() {
        return this.zookeeper;
    }

    private SetMultimap<String, String> parseZookeeperStr(String str) {
        HashMultimap create = HashMultimap.create();
        for (String str2 : str.split(";")) {
            String[] split = str2.split("::");
            create.put(split.length == 1 ? "com.datatorrent.contrib.kafka.defaultcluster" : split[0], split.length == 1 ? split[0] : split[1]);
        }
        return create;
    }
}
