package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Context;
import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator;
import com.google.common.collect.Lists;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import javax.annotation.Nonnull;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.class */
public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore> {
    private transient PreparedStatement updateCommand;
    protected static int DEFAULT_BATCH_SIZE = AbstractJdbcNonTransactionableBatchOutputOperator.DEFAULT_BATCH_SIZE;
    private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcTransactionableOutputOperator.class);
    private final List<T> tuples = Lists.newArrayList();

    @Min(1)
    private int batchSize = DEFAULT_BATCH_SIZE;
    private transient int batchStartIdx = 0;

    public AbstractJdbcTransactionableOutputOperator() {
        this.store = new JdbcTransactionalStore();
    }

    @Override // com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        try {
            this.updateCommand = ((JdbcTransactionalStore) this.store).connection.prepareStatement(getUpdateCommand());
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator
    public void endWindow() {
        if (this.tuples.size() - this.batchStartIdx > 0) {
            processBatch();
        }
        super.endWindow();
        this.tuples.clear();
        this.batchStartIdx = 0;
    }

    @Override // com.datatorrent.lib.db.AbstractTransactionableStoreOutputOperator
    public void processTuple(T t) {
        this.tuples.add(t);
        if (this.tuples.size() - this.batchStartIdx >= this.batchSize) {
            processBatch();
        }
    }

    private void processBatch() {
        logger.debug("start {} end {}", Integer.valueOf(this.batchStartIdx), Integer.valueOf(this.tuples.size()));
        try {
            try {
                for (int i = this.batchStartIdx; i < this.tuples.size(); i++) {
                    setStatementParameters(this.updateCommand, this.tuples.get(i));
                    this.updateCommand.addBatch();
                }
                this.updateCommand.executeBatch();
                this.updateCommand.clearBatch();
                this.batchStartIdx += this.tuples.size() - this.batchStartIdx;
            } catch (SQLException e) {
                throw new RuntimeException("processing batch", e);
            }
        } catch (Throwable th) {
            this.batchStartIdx += this.tuples.size() - this.batchStartIdx;
            throw th;
        }
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    @Nonnull
    protected abstract String getUpdateCommand();

    protected abstract void setStatementParameters(PreparedStatement preparedStatement, T t) throws SQLException;
}
