/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kinesis;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.datatorrent.common.util.Pair;
import com.datatorrent.contrib.kinesis.KinesisUtil;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KinesisConsumer
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(KinesisConsumer.class);
    protected Integer recordsLimit = 100;
    private int bufferSize = 1024;
    protected transient boolean isAlive = false;
    private transient ArrayBlockingQueue<Pair<String, Record>> holdingBuffer;
    @NotNull
    protected String streamName;
    @Pattern(flags={Pattern.Flag.CASE_INSENSITIVE}, regexp="earliest|latest")
    protected String initialOffset = "latest";
    protected transient ExecutorService consumerThreadExecutor = null;
    protected ConcurrentHashMap<String, String> shardPosition = new ConcurrentHashMap();
    protected final transient HashSet<Shard> simpleConsumerThreads = new HashSet();
    private Set<String> shardIds = new HashSet<String>();
    protected Set<Shard> closedShards = new HashSet<Shard>();
    protected long recordsCheckInterval = 500L;
    protected transient KinesisShardStats stats = new KinesisShardStats();

    public KinesisConsumer() {
    }

    public KinesisConsumer(String streamName) {
        this.streamName = streamName;
    }

    public KinesisConsumer(String streamName, Set<String> newShardIds) {
        this(streamName);
        this.shardIds = newShardIds;
    }

    public void create() {
        this.holdingBuffer = new ArrayBlockingQueue(this.bufferSize);
        boolean defaultSelect = this.shardIds == null || this.shardIds.size() == 0;
        List<Shard> pms = KinesisUtil.getInstance().getShardList(this.streamName);
        for (Shard shId : pms) {
            if (!this.shardIds.contains(shId.getShardId()) && !defaultSelect || this.closedShards.contains(shId)) continue;
            this.simpleConsumerThreads.add(shId);
        }
    }

    public ShardIteratorType getIteratorType(String shardId) {
        if (this.shardPosition.containsKey(shardId)) {
            return ShardIteratorType.AFTER_SEQUENCE_NUMBER;
        }
        return this.initialOffset.equalsIgnoreCase("earliest") ? ShardIteratorType.TRIM_HORIZON : ShardIteratorType.LATEST;
    }

    public void start() {
        this.isAlive = true;
        int realNumStream = this.simpleConsumerThreads.size();
        if (realNumStream == 0) {
            return;
        }
        this.consumerThreadExecutor = Executors.newFixedThreadPool(realNumStream);
        for (final Shard shd : this.simpleConsumerThreads) {
            this.consumerThreadExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    logger.debug("Thread " + Thread.currentThread().getName() + " start consuming Records...");
                    while (KinesisConsumer.this.isAlive) {
                        Shard shard = shd;
                        try {
                            List<Record> records = KinesisUtil.getInstance().getRecords(KinesisConsumer.this.streamName, KinesisConsumer.this.recordsLimit, shard.getShardId(), KinesisConsumer.this.getIteratorType(shard.getShardId()), KinesisConsumer.this.shardPosition.get(shard.getShardId()));
                            if (records == null || records.isEmpty()) {
                                if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
                                    KinesisConsumer.this.closedShards.add(shard);
                                    break;
                                }
                                try {
                                    Thread.sleep(KinesisConsumer.this.recordsCheckInterval);
                                    continue;
                                }
                                catch (Exception e) {
                                    throw new RuntimeException(e);
                                }
                            }
                            String seqNo = "";
                            for (Record rc : records) {
                                seqNo = rc.getSequenceNumber();
                                KinesisConsumer.this.putRecord(shd.getShardId(), rc);
                            }
                            KinesisConsumer.this.shardPosition.put(shard.getShardId(), seqNo);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                    logger.debug("Thread " + Thread.currentThread().getName() + " stop consuming Records...");
                }
            });
        }
    }

    @Override
    public void close() {
        if (this.consumerThreadExecutor != null) {
            this.consumerThreadExecutor.shutdown();
        }
        this.simpleConsumerThreads.clear();
    }

    public void stop() {
        this.isAlive = false;
        this.holdingBuffer.clear();
        IOUtils.closeQuietly((Closeable)this);
    }

    public void resetShardPositions(Map<String, String> shardPositions) {
        if (shardPositions == null) {
            return;
        }
        this.shardPosition.clear();
        for (String pid : this.shardIds) {
            String offsetForPar = shardPositions.get(pid);
            if (offsetForPar == null || offsetForPar.equals("")) continue;
            this.shardPosition.put(pid, offsetForPar);
        }
    }

    protected Map<String, String> getShardPosition() {
        return this.shardPosition;
    }

    public Set<Shard> getClosedShards() {
        return this.closedShards;
    }

    public Integer getNumOfShards() {
        return this.shardIds.size();
    }

    public KinesisShardStats getConsumerStats(Map<String, String> shardStats) {
        this.stats.updateShardStats(shardStats);
        return this.stats;
    }

    public void teardown() {
        this.holdingBuffer.clear();
    }

    public boolean isAlive() {
        return this.isAlive;
    }

    public void setAlive(boolean isAlive) {
        this.isAlive = isAlive;
    }

    public void setStreamName(String streamName) {
        this.streamName = streamName;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public Pair<String, Record> pollRecord() {
        return this.holdingBuffer.poll();
    }

    public int getQueueSize() {
        return this.holdingBuffer.size();
    }

    public void setInitialOffset(String initialOffset) {
        this.initialOffset = initialOffset;
    }

    public String getInitialOffset() {
        return this.initialOffset;
    }

    protected final void putRecord(String shardId, Record msg) throws InterruptedException {
        this.holdingBuffer.put((Pair<String, Record>)new Pair((Object)shardId, (Object)msg));
    }

    public Integer getRecordsLimit() {
        return this.recordsLimit;
    }

    public void setRecordsLimit(Integer recordsLimit) {
        this.recordsLimit = recordsLimit;
    }

    public Set<String> getShardIds() {
        return this.shardIds;
    }

    public void setShardIds(Set<String> shardIds) {
        this.shardIds = shardIds;
    }

    public void setQueueSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }

    public static class KinesisShardStatsUtil {
        public static Map<String, String> getShardStatsForPartitions(List<KinesisShardStats> kinesisshardStats) {
            HashMap result = Maps.newHashMap();
            for (KinesisShardStats kms : kinesisshardStats) {
                for (Map.Entry<String, String> item : kms.partitionStats.entrySet()) {
                    result.put(item.getKey(), item.getValue());
                }
            }
            return result;
        }
    }

    public static class KinesisShardStats
    implements Serializable {
        public ConcurrentHashMap<String, String> partitionStats = new ConcurrentHashMap();

        public void updateShardStats(Map<String, String> shardStats) {
            for (Map.Entry<String, String> ss : shardStats.entrySet()) {
                this.partitionStats.put(ss.getKey(), ss.getValue());
            }
        }
    }
}

