/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kinesis;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.Pair;
import com.datatorrent.contrib.kinesis.KinesisConsumer;
import com.datatorrent.contrib.kinesis.KinesisPair;
import com.datatorrent.contrib.kinesis.KinesisUtil;
import com.datatorrent.contrib.kinesis.ShardManager;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKinesisInputOperator<T>
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Partitioner<AbstractKinesisInputOperator>,
StatsListener,
Operator.CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class);
    @Min(value=1L)
    private int maxTuplesPerWindow = Integer.MAX_VALUE;
    private int emitCount = 0;
    @NotNull
    private String accessKey;
    @NotNull
    private String secretKey;
    private String endPoint;
    protected WindowDataManager windowDataManager;
    protected transient long currentWindowId;
    protected transient int operatorId;
    protected final transient Map<String, KinesisPair<String, Integer>> currentWindowRecoveryState;
    @Valid
    protected KinesisConsumer consumer = new KinesisConsumer();
    public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
    private transient Context.OperatorContext context = null;
    private transient Set<PartitionInfo> currentPartitionInfo = new HashSet<PartitionInfo>();
    protected transient Map<String, String> shardPosition = new HashMap<String, String>();
    private ShardManager shardManager = null;
    private long repartitionInterval = 30000L;
    private long repartitionCheckInterval = 5000L;
    private transient long lastCheckTime = 0L;
    private transient long lastRepartitionTime = 0L;
    private transient boolean isReplayState = false;
    @Min(value=1L)
    private Integer shardsPerPartition = 1;
    @Min(value=1L)
    private int initialPartitionCount = 1;
    private transient List<String> newWaitingPartition = new LinkedList<String>();
    public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort();

    public AbstractKinesisInputOperator() {
        this.windowDataManager = new FSWindowDataManager();
        this.currentWindowRecoveryState = new HashMap<String, KinesisPair<String, Integer>>();
    }

    public abstract T getTuple(Record var1);

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKinesisInputOperator>> partitions) {
        this.lastRepartitionTime = System.currentTimeMillis();
    }

    public Collection<Partitioner.Partition<AbstractKinesisInputOperator>> definePartitions(Collection<Partitioner.Partition<AbstractKinesisInputOperator>> partitions, Partitioner.PartitioningContext context) {
        int i;
        boolean isInitialParitition;
        boolean bl = isInitialParitition = partitions.iterator().next().getStats() == null;
        if (isInitialParitition) {
            try {
                KinesisUtil.getInstance().createKinesisClient(this.accessKey, this.secretKey, this.endPoint);
            }
            catch (Exception e) {
                throw new RuntimeException("[definePartitions]: Unable to load credentials. ", e);
            }
        }
        List<Shard> shards = KinesisUtil.getInstance().getShardList(this.getStreamName());
        ArrayList<Object> newPartitions = null;
        HashSet deletedOperators = Sets.newHashSet();
        Map<String, String> initShardPos = null;
        if (isInitialParitition && this.shardManager != null) {
            initShardPos = this.shardManager.loadInitialShardPositions();
        }
        switch (this.strategy) {
            case ONE_TO_ONE: {
                if (isInitialParitition) {
                    this.lastRepartitionTime = System.currentTimeMillis();
                    logger.info("[ONE_TO_ONE]: Initializing partition(s)");
                    newPartitions = new ArrayList<Partitioner.Partition<AbstractKinesisInputOperator>>(shards.size());
                    for (i = 0; i < shards.size(); ++i) {
                        logger.info("[ONE_TO_ONE]: Create operator partition for kinesis partition: " + shards.get(i).getShardId() + ", StreamName: " + this.getConsumer().streamName);
                        newPartitions.add(this.createPartition(Sets.newHashSet((Object[])new String[]{shards.get(i).getShardId()}), initShardPos));
                    }
                    break;
                }
                if (this.newWaitingPartition.size() == 0) break;
                this.removePartitionsForClosedShards(partitions, deletedOperators);
                for (String pid : this.newWaitingPartition) {
                    logger.info("[ONE_TO_ONE]: Add operator partition for kinesis partition " + pid);
                    partitions.add(this.createPartition(Sets.newHashSet((Object[])new String[]{pid}), null));
                }
                this.newWaitingPartition.clear();
                List managers = this.windowDataManager.partition(partitions.size(), (Set)deletedOperators);
                int i2 = 0;
                for (Partitioner.Partition<AbstractKinesisInputOperator> partition : partitions) {
                    ((AbstractKinesisInputOperator)partition.getPartitionedInstance()).setWindowDataManager((WindowDataManager)managers.get(i2));
                    ++i2;
                }
                return partitions;
            }
            case MANY_TO_ONE: {
                int size = this.initialPartitionCount;
                if (this.newWaitingPartition.size() != 0) {
                    shards = this.getOpenShards(partitions);
                    if (this.shardsPerPartition > 1) {
                        size = (int)Math.ceil((double)shards.size() / ((double)this.shardsPerPartition.intValue() * 1.0));
                    }
                    initShardPos = this.shardManager.loadInitialShardPositions();
                }
                Set[] pIds = (Set[])Array.newInstance(new HashSet().getClass(), size);
                newPartitions = new ArrayList(size);
                for (int i3 = 0; i3 < shards.size(); ++i3) {
                    Shard pm = shards.get(i3);
                    if (pIds[i3 % size] == null) {
                        pIds[i3 % size] = new HashSet();
                    }
                    pIds[i3 % size].add(pm.getShardId());
                }
                if (isInitialParitition) {
                    this.lastRepartitionTime = System.currentTimeMillis();
                    logger.info("[MANY_TO_ONE]: Initializing partition(s)");
                } else {
                    logger.info("[MANY_TO_ONE]: Add operator partition for kinesis partition(s): " + StringUtils.join(this.newWaitingPartition, (String)", ") + ", StreamName: " + this.getConsumer().streamName);
                    this.newWaitingPartition.clear();
                }
                for (Partitioner.Partition<AbstractKinesisInputOperator> op : partitions) {
                    deletedOperators.add(((AbstractKinesisInputOperator)op.getPartitionedInstance()).operatorId);
                }
                for (int i4 = 0; i4 < pIds.length; ++i4) {
                    logger.info("[MANY_TO_ONE]: Create operator partition for kinesis partition(s): " + StringUtils.join((Iterable)pIds[i4], (String)", ") + ", StreamName: " + this.getConsumer().streamName);
                    if (pIds[i4] == null) continue;
                    newPartitions.add(this.createPartition(pIds[i4], initShardPos));
                }
                break;
            }
        }
        i = 0;
        List managers = this.windowDataManager.partition(partitions.size(), (Set)deletedOperators);
        for (Partitioner.Partition<AbstractKinesisInputOperator> partition : partitions) {
            ((AbstractKinesisInputOperator)partition.getPartitionedInstance()).setWindowDataManager((WindowDataManager)managers.get(i++));
        }
        return newPartitions;
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
        StatsListener.Response resp = new StatsListener.Response();
        List<KinesisConsumer.KinesisShardStats> kstats = this.extractkinesisStats(stats);
        resp.repartitionRequired = this.isPartitionRequired(kstats);
        return resp;
    }

    private void updateShardPositions(List<KinesisConsumer.KinesisShardStats> kstats) {
        if (this.shardManager != null) {
            this.shardManager.updatePositions(KinesisConsumer.KinesisShardStatsUtil.getShardStatsForPartitions(kstats));
        }
    }

    private List<KinesisConsumer.KinesisShardStats> extractkinesisStats(StatsListener.BatchedOperatorStats stats) {
        LinkedList<KinesisConsumer.KinesisShardStats> kmsList = new LinkedList<KinesisConsumer.KinesisShardStats>();
        for (Stats.OperatorStats os : stats.getLastWindowedStats()) {
            if (os == null || !(os.counters instanceof KinesisConsumer.KinesisShardStats)) continue;
            kmsList.add((KinesisConsumer.KinesisShardStats)os.counters);
        }
        return kmsList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isPartitionRequired(List<KinesisConsumer.KinesisShardStats> kstats) {
        long t = System.currentTimeMillis();
        if (t - this.lastCheckTime < this.repartitionCheckInterval) {
            return false;
        }
        logger.debug("Use ShardManager to update the Shard Positions");
        this.updateShardPositions(kstats);
        if (this.repartitionInterval < 0L) {
            return false;
        }
        if (t - this.lastRepartitionTime < this.repartitionInterval) {
            return false;
        }
        try {
            HashSet<String> existingIds = new HashSet<String>();
            for (PartitionInfo pio : this.currentPartitionInfo) {
                existingIds.addAll(pio.kpids);
            }
            List<Shard> shards = KinesisUtil.getInstance().getShardList(this.getStreamName());
            for (Shard shard : shards) {
                if (existingIds.contains(shard.getShardId())) continue;
                this.newWaitingPartition.add(shard.getShardId());
            }
            if (this.newWaitingPartition.size() != 0) {
                this.lastRepartitionTime = t;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.lastCheckTime = System.currentTimeMillis();
        }
    }

    private void removePartitionsForClosedShards(Collection<Partitioner.Partition<AbstractKinesisInputOperator>> partitions, Set<Integer> deletedOperators) {
        ArrayList<Partitioner.Partition<AbstractKinesisInputOperator>> closedPartitions = new ArrayList<Partitioner.Partition<AbstractKinesisInputOperator>>();
        for (Partitioner.Partition<AbstractKinesisInputOperator> partition : partitions) {
            if (((AbstractKinesisInputOperator)partition.getPartitionedInstance()).getConsumer().getClosedShards().size() != ((AbstractKinesisInputOperator)partition.getPartitionedInstance()).getConsumer().getNumOfShards().intValue()) continue;
            closedPartitions.add(partition);
            deletedOperators.add(((AbstractKinesisInputOperator)partition.getPartitionedInstance()).operatorId);
        }
        if (closedPartitions.size() != 0) {
            for (Partitioner.Partition partition : closedPartitions) {
                partitions.remove(partition);
            }
        }
    }

    private List<Shard> getOpenShards(Collection<Partitioner.Partition<AbstractKinesisInputOperator>> partitions) {
        ArrayList<Shard> closedShards = new ArrayList<Shard>();
        for (Partitioner.Partition<AbstractKinesisInputOperator> op : partitions) {
            closedShards.addAll(((AbstractKinesisInputOperator)op.getPartitionedInstance()).getConsumer().getClosedShards());
        }
        List<Shard> shards = KinesisUtil.getInstance().getShardList(this.getStreamName());
        ArrayList<Shard> openShards = new ArrayList<Shard>();
        for (Shard shard : shards) {
            if (closedShards.contains(shard)) continue;
            openShards.add(shard);
        }
        return openShards;
    }

    private Partitioner.Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos) {
        DefaultPartition p = new DefaultPartition(KryoCloneUtils.cloneObject((Object)this));
        ((AbstractKinesisInputOperator)p.getPartitionedInstance()).getConsumer().setShardIds(shardIds);
        ((AbstractKinesisInputOperator)p.getPartitionedInstance()).getConsumer().resetShardPositions(initShardPos);
        PartitionInfo pif = new PartitionInfo();
        pif.kpids = shardIds;
        this.currentPartitionInfo.add(pif);
        return p;
    }

    public void setup(Context.OperatorContext context) {
        this.context = context;
        try {
            KinesisUtil.getInstance().createKinesisClient(this.accessKey, this.secretKey, this.endPoint);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.consumer.create();
        this.operatorId = context.getId();
        this.windowDataManager.setup((Context)context);
        this.shardPosition.clear();
        if ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < this.windowDataManager.getLargestCompletedWindow()) {
            this.isReplayState = true;
        }
    }

    public void teardown() {
        this.windowDataManager.teardown();
        this.consumer.teardown();
    }

    public void beginWindow(long windowId) {
        this.emitCount = 0;
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestCompletedWindow()) {
            this.replay(windowId);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected void replay(long windowId) {
        try {
            Map recoveredData = (Map)this.windowDataManager.retrieve(windowId);
            if (recoveredData == null) {
                return;
            }
            for (Map.Entry rc : recoveredData.entrySet()) {
                logger.debug("Replaying the windowId: {}", (Object)windowId);
                logger.debug("ShardId: " + (String)rc.getKey() + " , Start Sequence Id: " + (String)((KinesisPair)((Object)rc.getValue())).getFirst() + " , No Of Records: " + ((KinesisPair)((Object)rc.getValue())).getSecond());
                try {
                    List<Record> records = KinesisUtil.getInstance().getRecords(this.consumer.streamName, (Integer)((KinesisPair)((Object)rc.getValue())).getSecond(), (String)rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, (String)((KinesisPair)((Object)rc.getValue())).getFirst());
                    for (Record record : records) {
                        this.outputPort.emit(this.getTuple(record));
                        this.shardPosition.put((String)rc.getKey(), record.getSequenceNumber());
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                    return;
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            this.context.setCounters((Object)this.getConsumer().getConsumerStats(this.shardPosition));
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
    }

    public void activate(Context.OperatorContext ctx) {
        if (this.isReplayState) {
            return;
        }
        this.consumer.start();
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("deleting state", e);
        }
    }

    public void checkpointed(long windowId) {
    }

    public void deactivate() {
        this.consumer.stop();
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        int count = this.consumer.getQueueSize();
        if (this.maxTuplesPerWindow > 0) {
            count = Math.min(count, this.maxTuplesPerWindow - this.emitCount);
        }
        for (int i = 0; i < count; ++i) {
            Pair<String, Record> data = this.consumer.pollRecord();
            String shardId = (String)data.getFirst();
            String recordId = ((Record)data.getSecond()).getSequenceNumber();
            T tuple = this.getTuple((Record)data.getSecond());
            this.outputPort.emit(tuple);
            if (!this.currentWindowRecoveryState.containsKey(shardId)) {
                this.currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));
            } else {
                KinesisPair<String, Integer> second = this.currentWindowRecoveryState.get(shardId);
                Integer noOfRecords = (Integer)second.getSecond();
                this.currentWindowRecoveryState.put((String)data.getFirst(), new KinesisPair<Object, Integer>(second.getFirst(), noOfRecords + 1));
            }
            this.shardPosition.put(shardId, recordId);
        }
        if (this.isReplayState) {
            this.isReplayState = false;
            HashMap<String, String> statsData = new HashMap<String, String>(this.getConsumer().getShardPosition());
            statsData.putAll(this.shardPosition);
            this.getConsumer().resetShardPositions(statsData);
            this.consumer.start();
        }
        this.emitCount += count;
    }

    public void setConsumer(KinesisConsumer consumer) {
        this.consumer = consumer;
    }

    public KinesisConsumer getConsumer() {
        return this.consumer;
    }

    public String getStreamName() {
        return this.consumer.getStreamName();
    }

    public void setStreamName(String streamName) {
        this.consumer.setStreamName(streamName);
    }

    public int getMaxTuplesPerWindow() {
        return this.maxTuplesPerWindow;
    }

    public void setMaxTuplesPerWindow(int maxTuplesPerWindow) {
        this.maxTuplesPerWindow = maxTuplesPerWindow;
    }

    public PartitionStrategy getStrategy() {
        return this.strategy;
    }

    public void setStrategy(String policy) {
        this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
    }

    public Context.OperatorContext getContext() {
        return this.context;
    }

    public void setContext(Context.OperatorContext context) {
        this.context = context;
    }

    public ShardManager getShardManager() {
        return this.shardManager;
    }

    public void setShardManager(ShardManager shardManager) {
        this.shardManager = shardManager;
    }

    public long getRepartitionInterval() {
        return this.repartitionInterval;
    }

    public void setRepartitionInterval(long repartitionInterval) {
        this.repartitionInterval = repartitionInterval;
    }

    public long getRepartitionCheckInterval() {
        return this.repartitionCheckInterval;
    }

    public void setRepartitionCheckInterval(long repartitionCheckInterval) {
        this.repartitionCheckInterval = repartitionCheckInterval;
    }

    public Integer getShardsPerPartition() {
        return this.shardsPerPartition;
    }

    public void setShardsPerPartition(Integer shardsPerPartition) {
        this.shardsPerPartition = shardsPerPartition;
    }

    public int getInitialPartitionCount() {
        return this.initialPartitionCount;
    }

    public void setInitialPartitionCount(int initialPartitionCount) {
        this.initialPartitionCount = initialPartitionCount;
    }

    public void setInitialOffset(String initialOffset) {
        this.consumer.initialOffset = initialOffset;
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public String getSecretKey() {
        return this.secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public String getEndPoint() {
        return this.endPoint;
    }

    public void setEndPoint(String endPoint) {
        this.endPoint = endPoint;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    static class PartitionInfo {
        Set<String> kpids;

        PartitionInfo() {
        }
    }

    public static enum PartitionStrategy {
        ONE_TO_ONE,
        MANY_TO_ONE;

    }
}

