package com.datatorrent.contrib.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.Pair;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.class */
public abstract class AbstractKinesisOutputOperator<V, T> implements Operator {
    protected String streamName;

    @NotNull
    private String accessKey;

    @NotNull
    private String secretKey;
    private String endPoint;
    protected int sendCount;
    protected boolean isBatchProcessing = true;
    List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList();

    @Max(500)
    @Min(2)
    protected int batchSize = 92;
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { // from class: com.datatorrent.contrib.kinesis.AbstractKinesisOutputOperator.1
        public void process(T t) {
            AbstractKinesisOutputOperator.this.processTuple(t);
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisOutputOperator.class);
    protected static transient AmazonKinesisClient client = null;

    protected abstract byte[] getRecord(V v);

    protected abstract Pair<String, V> tupleToKeyValue(T t);

    public void beginWindow(long j) {
    }

    public void teardown() {
    }

    public void endWindow() {
        if (!this.isBatchProcessing || this.putRecordsRequestEntryList.size() == 0) {
            return;
        }
        try {
            flushRecords();
        } catch (AmazonClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        try {
            KinesisUtil.getInstance().createKinesisClient(this.accessKey, this.secretKey, this.endPoint);
            setClient(KinesisUtil.getInstance().getClient());
            if (this.isBatchProcessing) {
                this.putRecordsRequestEntryList.clear();
            }
        } catch (Exception e) {
            throw new RuntimeException("Unable to load Credentials", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processTuple(T t) {
        try {
            if (this.isBatchProcessing) {
                if (this.putRecordsRequestEntryList.size() == this.batchSize) {
                    flushRecords();
                    logger.debug("flushed {} records.", Integer.valueOf(this.batchSize));
                }
                addRecord(t);
            } else {
                Pair tupleToKeyValue = tupleToKeyValue(t);
                PutRecordRequest putRecordRequest = new PutRecordRequest();
                putRecordRequest.setStreamName(this.streamName);
                putRecordRequest.setPartitionKey((String) tupleToKeyValue.first);
                putRecordRequest.setData(ByteBuffer.wrap(getRecord(tupleToKeyValue.second)));
                client.putRecord(putRecordRequest);
            }
            this.sendCount++;
        } catch (AmazonClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addRecord(T t) {
        try {
            Pair tupleToKeyValue = tupleToKeyValue(t);
            PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(getRecord(tupleToKeyValue.second)));
            putRecordsRequestEntry.setPartitionKey((String) tupleToKeyValue.first);
            this.putRecordsRequestEntryList.add(putRecordsRequestEntry);
        } catch (AmazonClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void flushRecords() {
        try {
            PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
            putRecordsRequest.setStreamName(this.streamName);
            putRecordsRequest.setRecords(this.putRecordsRequestEntryList);
            client.putRecords(putRecordsRequest);
            this.putRecordsRequestEntryList.clear();
            logger.debug("Records flushed.");
        } catch (AmazonClientException e) {
            logger.warn("PutRecordsRequest exception.", e);
            throw new RuntimeException((Throwable) e);
        }
    }

    public void setClient(AmazonKinesisClient amazonKinesisClient) {
        client = amazonKinesisClient;
    }

    public String getStreamName() {
        return this.streamName;
    }

    public void setStreamName(String str) {
        this.streamName = str;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public boolean isBatchProcessing() {
        return this.isBatchProcessing;
    }

    public void setBatchProcessing(boolean z) {
        this.isBatchProcessing = z;
    }

    public String getAccessKey() {
        return this.accessKey;
    }

    public void setAccessKey(String str) {
        this.accessKey = str;
    }

    public String getSecretKey() {
        return this.secretKey;
    }

    public void setSecretKey(String str) {
        this.secretKey = str;
    }

    public String getEndPoint() {
        return this.endPoint;
    }

    public void setEndPoint(String str) {
        this.endPoint = str;
    }
}
