package com.datatorrent.contrib.cassandra;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.class */
public abstract class AbstractCassandraInputOperator<T> extends AbstractStoreInputOperator<T, CassandraStore> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);

    @AutoMetric
    protected long tuplesRead;
    int waitForDataTimeout = 100;
    public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();

    public void beginWindow(long j) {
        super.beginWindow(j);
        this.tuplesRead = 0L;
    }

    public int getWaitForDataTimeout() {
        return this.waitForDataTimeout;
    }

    public void setWaitForDataTimeout(int i) {
        this.waitForDataTimeout = i;
    }

    public abstract T getTuple(Row row);

    public abstract String queryToRetrieveData();

    public void emitTuples() {
        String queryToRetrieveData = queryToRetrieveData();
        logger.debug("select statement: {}", queryToRetrieveData);
        try {
            ResultSet execute = ((CassandraStore) this.store).getSession().execute(queryToRetrieveData);
            if (execute.isExhausted()) {
                Thread.sleep(this.waitForDataTimeout);
            } else {
                Iterator it = execute.iterator();
                while (it.hasNext()) {
                    emit(getTuple((Row) it.next()));
                    this.tuplesRead++;
                }
            }
        } catch (Exception e) {
            ((CassandraStore) this.store).disconnect();
            DTThrowable.rethrow(e);
        }
    }

    protected void emit(T t) {
        this.outputPort.emit(t);
    }
}
