package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
import com.google.common.collect.Lists;
import java.sql.BatchUpdateException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.class */
public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore> implements Operator.ActivationListener<Context.OperatorContext> {
    private transient PreparedStatement updateCommand;

    @AutoMetric
    private int tuplesWrittenSuccessfully;

    @AutoMetric
    private int errorTuples;
    protected static int DEFAULT_BATCH_SIZE = 1000;
    private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<T> error = new DefaultOutputPort<>();
    protected final List<T> tuples = Lists.newArrayList();

    @Min(1)
    private int batchSize = DEFAULT_BATCH_SIZE;
    protected transient int batchStartIdx = 0;

    public AbstractJdbcTransactionableOutputOperator() {
        this.store = new JdbcTransactionalStore();
    }

    @Override // com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
    }

    @Override // 
    public void activate(Context.OperatorContext operatorContext) {
        try {
            this.updateCommand = ((JdbcTransactionalStore) this.store).connection.prepareStatement(getUpdateCommand());
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator, com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator
    public void beginWindow(long j) {
        super.beginWindow(j);
        this.tuplesWrittenSuccessfully = 0;
        this.errorTuples = 0;
    }

    @Override // com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
    public void endWindow() {
        if (this.tuples.size() - this.batchStartIdx > 0) {
            processBatch();
        }
        super.endWindow();
        this.tuples.clear();
        this.batchStartIdx = 0;
    }

    public void deactivate() {
    }

    @Override // com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator
    public void processTuple(T t) {
        this.tuples.add(t);
        if (this.tuples.size() - this.batchStartIdx >= this.batchSize) {
            processBatch();
        }
    }

    protected void processBatch() {
        logger.debug("start {} end {}", Integer.valueOf(this.batchStartIdx), Integer.valueOf(this.tuples.size()));
        try {
            for (int i = this.batchStartIdx; i < this.tuples.size(); i++) {
                setStatementParameters(this.updateCommand, this.tuples.get(i));
                this.updateCommand.addBatch();
            }
            this.updateCommand.executeBatch();
            this.updateCommand.clearBatch();
            this.batchStartIdx += this.tuples.size() - this.batchStartIdx;
        } catch (BatchUpdateException e) {
            logger.error(e.getMessage());
            processUpdateCounts(e.getUpdateCounts(), this.tuples.size() - this.batchStartIdx);
        } catch (SQLException e2) {
            throw new RuntimeException("processing batch", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processUpdateCounts(int[] iArr, int i) {
        if (iArr.length < i) {
            this.error.emit(this.tuples.get(iArr.length + this.batchStartIdx));
            this.errorTuples++;
            this.tuplesWrittenSuccessfully += iArr.length;
            this.batchStartIdx += iArr.length + 1;
            if (this.tuples.size() - this.batchStartIdx > 0) {
                processBatch();
                return;
            }
            return;
        }
        this.tuplesWrittenSuccessfully = i;
        for (int i2 = 0; i2 < i; i2++) {
            if (iArr[i2] == -3) {
                this.error.emit(this.tuples.get(i2 + this.batchStartIdx));
                this.errorTuples++;
                this.tuplesWrittenSuccessfully--;
            }
        }
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    protected abstract String getUpdateCommand();

    protected abstract void setStatementParameters(PreparedStatement preparedStatement, T t) throws SQLException;

    public int getTuplesWrittenSuccessfully() {
        return this.tuplesWrittenSuccessfully;
    }

    public void setTuplesWrittenSuccessfully(int i) {
        this.tuplesWrittenSuccessfully = i;
    }
}
