/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.contrib.kafka.KafkaMetadataUtil;
import com.datatorrent.contrib.kafka.KafkaPartition;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import java.io.Closeable;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import kafka.message.Message;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

public abstract class KafkaConsumer
implements Closeable {
    protected static final String HIGHLEVEL_CONSUMER_ID_SUFFIX = "_stream_";
    protected static final String SIMPLE_CONSUMER_ID_SUFFIX = "_partition_";
    private String zookeeper;
    private int cacheSize = 1024;
    protected transient boolean isAlive = false;
    private transient ArrayBlockingQueue<KafkaMessage> holdingBuffer;
    @NotNull
    protected String topic = "default_topic";
    @NotNull
    @FieldSerializer.Bind(value=JavaSerializer.class)
    SetMultimap<String, String> zookeeperMap;
    protected transient SetMultimap<String, String> brokers;
    @Pattern(flags={Pattern.Flag.CASE_INSENSITIVE}, regexp="earliest|latest")
    protected String initialOffset = "latest";
    protected transient SnapShot statsSnapShot = new SnapShot();
    protected transient KafkaMeterStats stats = new KafkaMeterStats();

    public KafkaConsumer() {
    }

    public KafkaConsumer(String topic) {
        this();
        this.topic = topic;
    }

    public KafkaConsumer(String zks, String topic) {
        this.topic = topic;
        this.setZookeeper(zks);
    }

    public void create() {
        this.initBrokers();
        this.holdingBuffer = new ArrayBlockingQueue(this.cacheSize);
    }

    public void initBrokers() {
        if (this.brokers != null) {
            return;
        }
        if (this.zookeeperMap != null) {
            this.brokers = HashMultimap.create();
            for (String clusterId : this.zookeeperMap.keySet()) {
                try {
                    this.brokers.putAll((Object)clusterId, KafkaMetadataUtil.getBrokers(this.zookeeperMap.get((Object)clusterId)));
                }
                catch (Exception e) {
                    throw new RuntimeException("Error resolving brokers for cluster " + clusterId + " " + this.zookeeperMap.get((Object)clusterId), e);
                }
            }
        }
    }

    public void start() {
        this.isAlive = true;
        this.statsSnapShot.start();
    }

    public void stop() {
        this.isAlive = false;
        this.statsSnapShot.stop();
        this.holdingBuffer.clear();
        IOUtils.closeQuietly((Closeable)this);
    }

    public void teardown() {
        this.holdingBuffer.clear();
    }

    public boolean isAlive() {
        return this.isAlive;
    }

    public void setAlive(boolean isAlive) {
        this.isAlive = isAlive;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTopic() {
        return this.topic;
    }

    public KafkaMessage pollMessage() {
        return this.holdingBuffer.poll();
    }

    public int messageSize() {
        return this.holdingBuffer.size();
    }

    public void setInitialOffset(String initialOffset) {
        this.initialOffset = initialOffset;
    }

    public String getInitialOffset() {
        return this.initialOffset;
    }

    public int getCacheSize() {
        return this.cacheSize;
    }

    public void setCacheSize(int cacheSize) {
        this.cacheSize = cacheSize;
    }

    protected final void putMessage(KafkaPartition partition, Message msg, long offset) throws InterruptedException {
        this.holdingBuffer.put(new KafkaMessage(partition, msg, offset));
        this.statsSnapShot.mark(partition, msg.payloadSize());
    }

    protected abstract void commitOffset();

    protected abstract Map<KafkaPartition, Long> getCurrentOffsets();

    public KafkaMeterStats getConsumerStats() {
        this.statsSnapShot.setupStats(this.stats);
        return this.stats;
    }

    protected abstract void resetPartitionsAndOffset(Set<KafkaPartition> var1, Map<KafkaPartition, Long> var2);

    public void setZookeeper(String zookeeper) {
        this.zookeeper = zookeeper;
        this.zookeeperMap = this.parseZookeeperStr(zookeeper);
    }

    public String getZookeeper() {
        return this.zookeeper;
    }

    private SetMultimap<String, String> parseZookeeperStr(String zookeeper) {
        HashMultimap theClusters = HashMultimap.create();
        for (String zk : zookeeper.split(";")) {
            String[] parts = zk.split("::");
            String clusterId = parts.length == 1 ? "com.datatorrent.contrib.kafka.defaultcluster" : parts[0];
            theClusters.put((Object)clusterId, (Object)(parts.length == 1 ? parts[0] : parts[1]));
        }
        return theClusters;
    }

    static class SnapShot {
        private final Map<KafkaPartition, long[]> _1_min_msg_sum_par = new HashMap<KafkaPartition, long[]>();
        private final Map<KafkaPartition, long[]> _1_min_byte_sum_par = new HashMap<KafkaPartition, long[]>();
        private static int cursor = 0;
        private final long[] msgSec = new long[61];
        private final long[] bytesSec = new long[61];
        private short last = 1;
        private ScheduledExecutorService service;

        SnapShot() {
        }

        public synchronized void moveNext() {
            cursor = (cursor + 1) % 60;
            this.msgSec[60] = this.msgSec[60] - this.msgSec[cursor];
            this.bytesSec[60] = this.bytesSec[60] - this.bytesSec[cursor];
            this.msgSec[SnapShot.cursor] = 0L;
            this.bytesSec[SnapShot.cursor] = 0L;
            for (Map.Entry<KafkaPartition, long[]> item : this._1_min_msg_sum_par.entrySet()) {
                long[] msgv = item.getValue();
                long[] bytesv = this._1_min_byte_sum_par.get(item.getKey());
                msgv[60] = msgv[60] - msgv[cursor];
                bytesv[60] = bytesv[60] - bytesv[cursor];
                msgv[SnapShot.cursor] = 0L;
                bytesv[SnapShot.cursor] = 0L;
            }
        }

        public void start() {
            if (this.service == null) {
                this.service = Executors.newScheduledThreadPool(1);
            }
            this.service.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    SnapShot.this.moveNext();
                    if (SnapShot.this.last < 60) {
                        SnapShot.access$008(SnapShot.this);
                    }
                }
            }, 1L, 1L, TimeUnit.SECONDS);
        }

        public void stop() {
            if (this.service != null) {
                this.service.shutdown();
            }
        }

        public synchronized void mark(KafkaPartition partition, long bytes) {
            int n = cursor;
            this.msgSec[n] = this.msgSec[n] + 1L;
            this.msgSec[60] = this.msgSec[60] + 1L;
            int n2 = cursor;
            this.bytesSec[n2] = this.bytesSec[n2] + bytes;
            this.bytesSec[60] = this.bytesSec[60] + bytes;
            long[] msgv = this._1_min_msg_sum_par.get(partition);
            long[] bytev = this._1_min_byte_sum_par.get(partition);
            if (msgv == null) {
                msgv = new long[61];
                bytev = new long[61];
                this._1_min_msg_sum_par.put(partition, msgv);
                this._1_min_byte_sum_par.put(partition, bytev);
            }
            int n3 = cursor;
            msgv[n3] = msgv[n3] + 1L;
            msgv[60] = msgv[60] + 1L;
            int n4 = cursor;
            bytev[n4] = bytev[n4] + bytes;
            bytev[60] = bytev[60] + bytes;
        }

        public synchronized void setupStats(KafkaMeterStats stat) {
            long[] _1minAvg = new long[]{this.msgSec[60] / (long)this.last, this.bytesSec[60] / (long)this.last};
            for (Map.Entry<KafkaPartition, long[]> item : this._1_min_msg_sum_par.entrySet()) {
                long[] msgv = item.getValue();
                long[] bytev = this._1_min_byte_sum_par.get(item.getKey());
                long[] _1minAvgPar = new long[]{msgv[60] / (long)this.last, bytev[60] / (long)this.last};
                stat.set_1minMovingAvgPerPartition(item.getKey(), _1minAvgPar);
            }
            stat.set_1minMovingAvg(_1minAvg);
        }

        static /* synthetic */ short access$008(SnapShot x0) {
            short s = x0.last;
            x0.last = (short)(s + 1);
            return s;
        }
    }

    public static class PartitionStats
    implements Serializable {
        private static final long serialVersionUID = -6572690643487689766L;
        public int brokerId = -1;
        public long msgsPerSec;
        public long bytesPerSec;
        public long offset;
        public String brokerHost = "";
    }

    public static class KafkaMeterStatsAggregator
    implements Context.CountersAggregator,
    Serializable {
        private static final long serialVersionUID = 729987800215151678L;

        public Object aggregate(Collection<?> countersList) {
            KafkaMeterStats kms = new KafkaMeterStats();
            for (Object o : countersList) {
                if (!(o instanceof KafkaMeterStats)) continue;
                KafkaMeterStats subKMS = (KafkaMeterStats)o;
                kms.partitionStats.putAll(subKMS.partitionStats);
                kms.totalBytesPerSec += subKMS.totalBytesPerSec;
                kms.totalMsgPerSec += subKMS.totalMsgPerSec;
            }
            return kms;
        }
    }

    public static class KafkaMeterStatsUtil {
        public static Map<KafkaPartition, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats) {
            HashMap result = Maps.newHashMap();
            for (KafkaMeterStats kms : kafkaMeterStats) {
                for (Map.Entry<KafkaPartition, PartitionStats> item : kms.partitionStats.entrySet()) {
                    result.put(item.getKey(), item.getValue().offset);
                }
            }
            return result;
        }

        public static Map<KafkaPartition, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats) {
            HashMap result = Maps.newHashMap();
            for (Map.Entry<KafkaPartition, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
                result.put(item.getKey(), new long[]{item.getValue().msgsPerSec, item.getValue().bytesPerSec});
            }
            return result;
        }
    }

    public static class KafkaMessage {
        KafkaPartition kafkaPart;
        Message msg;
        long offSet;

        public KafkaMessage(KafkaPartition kafkaPart, Message msg, long offset) {
            this.kafkaPart = kafkaPart;
            this.msg = msg;
            this.offSet = offset;
        }

        public KafkaPartition getKafkaPart() {
            return this.kafkaPart;
        }

        public Message getMsg() {
            return this.msg;
        }

        public long getOffSet() {
            return this.offSet;
        }
    }

    public static class KafkaMeterStats
    implements Serializable {
        private static final long serialVersionUID = -2867402654990209006L;
        public ConcurrentHashMap<KafkaPartition, PartitionStats> partitionStats = new ConcurrentHashMap();
        public long totalMsgPerSec;
        public long totalBytesPerSec;

        public void set_1minMovingAvgPerPartition(KafkaPartition kp, long[] _1minAvgPar) {
            PartitionStats ps = this.putPartitionStatsIfNotPresent(kp);
            ps.msgsPerSec = _1minAvgPar[0];
            ps.bytesPerSec = _1minAvgPar[1];
        }

        public void set_1minMovingAvg(long[] _1minAvg) {
            this.totalMsgPerSec = _1minAvg[0];
            this.totalBytesPerSec = _1minAvg[1];
        }

        public void updateOffsets(Map<KafkaPartition, Long> offsets) {
            for (Map.Entry<KafkaPartition, Long> os : offsets.entrySet()) {
                PartitionStats ps = this.putPartitionStatsIfNotPresent(os.getKey());
                ps.offset = os.getValue();
            }
        }

        public int getConnections() {
            int r = 0;
            for (PartitionStats ps : this.partitionStats.values()) {
                if (StringUtils.isEmpty((CharSequence)ps.brokerHost)) continue;
                ++r;
            }
            return r;
        }

        public void updatePartitionStats(KafkaPartition kp, int brokerId, String host) {
            PartitionStats ps = this.putPartitionStatsIfNotPresent(kp);
            ps.brokerHost = host;
            ps.brokerId = brokerId;
        }

        private synchronized PartitionStats putPartitionStatsIfNotPresent(KafkaPartition kp) {
            PartitionStats ps = this.partitionStats.get(kp);
            if (ps == null) {
                ps = new PartitionStats();
                this.partitionStats.put(kp, ps);
            }
            return ps;
        }
    }
}

