package com.datatorrent.lib.db.jdbc;

import com.datatorrent.api.Context;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.class */
public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOperator<T, JdbcStore> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcInputOperator.class);
    protected transient Statement queryStatement;
    private transient int waitForDataTimeout;

    public abstract T getTuple(ResultSet resultSet);

    public abstract String queryToRetrieveData();

    public void emitTuples() {
        String queryToRetrieveData = queryToRetrieveData();
        logger.debug(String.format("select statement: %s", queryToRetrieveData));
        try {
            ResultSet executeQuery = this.queryStatement.executeQuery(queryToRetrieveData);
            if (!executeQuery.next()) {
                Thread.sleep(this.waitForDataTimeout);
            }
            do {
                this.outputPort.emit(getTuple(executeQuery));
            } while (executeQuery.next());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (SQLException e2) {
            ((JdbcStore) this.store).disconnect();
            throw new RuntimeException(String.format("Error while running query: %s", queryToRetrieveData), e2);
        }
    }

    @Override // com.datatorrent.lib.db.AbstractStoreInputOperator
    public void setup(Context.OperatorContext operatorContext) {
        this.waitForDataTimeout = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        super.setup(operatorContext);
        try {
            this.queryStatement = ((JdbcStore) this.store).getConnection().createStatement();
        } catch (SQLException e) {
            throw new RuntimeException("creating query", e);
        }
    }
}
