/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.Pair;
import com.datatorrent.contrib.kinesis.KinesisUtil;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKinesisOutputOperator<V, T>
implements Operator {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisOutputOperator.class);
    protected String streamName;
    @NotNull
    private String accessKey;
    @NotNull
    private String secretKey;
    private String endPoint;
    protected static transient AmazonKinesisClient client = null;
    protected int sendCount;
    protected boolean isBatchProcessing = true;
    List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>();
    @Min(value=2L)
    @Max(value=500L)
    protected int batchSize = 92;
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>(){

        public void process(T tuple) {
            AbstractKinesisOutputOperator.this.processTuple(tuple);
        }
    };

    protected abstract byte[] getRecord(V var1);

    protected abstract Pair<String, V> tupleToKeyValue(T var1);

    public void beginWindow(long windowId) {
    }

    public void teardown() {
    }

    public void endWindow() {
        if (this.isBatchProcessing && this.putRecordsRequestEntryList.size() != 0) {
            try {
                this.flushRecords();
            }
            catch (AmazonClientException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void setup(Context.OperatorContext context) {
        try {
            KinesisUtil.getInstance().createKinesisClient(this.accessKey, this.secretKey, this.endPoint);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to load Credentials", e);
        }
        this.setClient(KinesisUtil.getInstance().getClient());
        if (this.isBatchProcessing) {
            this.putRecordsRequestEntryList.clear();
        }
    }

    public void processTuple(T tuple) {
        try {
            if (this.isBatchProcessing) {
                if (this.putRecordsRequestEntryList.size() == this.batchSize) {
                    this.flushRecords();
                    logger.debug("flushed {} records.", (Object)this.batchSize);
                }
                this.addRecord(tuple);
            } else {
                Pair<String, V> keyValue = this.tupleToKeyValue(tuple);
                PutRecordRequest requestRecord = new PutRecordRequest();
                requestRecord.setStreamName(this.streamName);
                requestRecord.setPartitionKey((String)keyValue.first);
                requestRecord.setData(ByteBuffer.wrap(this.getRecord(keyValue.second)));
                client.putRecord(requestRecord);
            }
            ++this.sendCount;
        }
        catch (AmazonClientException e) {
            throw new RuntimeException(e);
        }
    }

    private void addRecord(T tuple) {
        try {
            Pair<String, V> keyValue = this.tupleToKeyValue(tuple);
            PutRecordsRequestEntry putRecordsEntry = new PutRecordsRequestEntry();
            putRecordsEntry.setData(ByteBuffer.wrap(this.getRecord(keyValue.second)));
            putRecordsEntry.setPartitionKey((String)keyValue.first);
            this.putRecordsRequestEntryList.add(putRecordsEntry);
        }
        catch (AmazonClientException e) {
            throw new RuntimeException(e);
        }
    }

    private void flushRecords() {
        try {
            PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
            putRecordsRequest.setStreamName(this.streamName);
            putRecordsRequest.setRecords(this.putRecordsRequestEntryList);
            client.putRecords(putRecordsRequest);
            this.putRecordsRequestEntryList.clear();
            logger.debug("Records flushed.");
        }
        catch (AmazonClientException e) {
            logger.warn("PutRecordsRequest exception.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void setClient(AmazonKinesisClient _client) {
        client = _client;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public void setStreamName(String streamName) {
        this.streamName = streamName;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public boolean isBatchProcessing() {
        return this.isBatchProcessing;
    }

    public void setBatchProcessing(boolean isBatchProcessing) {
        this.isBatchProcessing = isBatchProcessing;
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public String getSecretKey() {
        return this.secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public String getEndPoint() {
        return this.endPoint;
    }

    public void setEndPoint(String endPoint) {
        this.endPoint = endPoint;
    }
}

