/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.cassandra;

import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.contrib.cassandra.CassandraStore;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCassandraInputOperator<T>
extends AbstractStoreInputOperator<T, CassandraStore> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
    private PagingState nextPageState;
    private int fetchSize;
    int waitForDataTimeout = 100;
    public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort();

    public void beginWindow(long l) {
        super.beginWindow(l);
    }

    public int getWaitForDataTimeout() {
        return this.waitForDataTimeout;
    }

    public void setWaitForDataTimeout(int waitForDataTimeout) {
        this.waitForDataTimeout = waitForDataTimeout;
    }

    public abstract T getTuple(Row var1);

    public abstract String queryToRetrieveData();

    public void emitTuples() {
        String query = this.queryToRetrieveData();
        logger.debug("select statement: {}", (Object)query);
        SimpleStatement stmt = new SimpleStatement(query);
        stmt.setFetchSize(this.fetchSize);
        try {
            if (this.nextPageState != null) {
                stmt.setPagingState(this.nextPageState);
            }
            ResultSet result = ((CassandraStore)this.store).getSession().execute((Statement)stmt);
            this.nextPageState = result.getExecutionInfo().getPagingState();
            if (!result.isExhausted()) {
                for (Row row : result) {
                    T tuple = this.getTuple(row);
                    this.emit(tuple);
                }
            } else {
                Thread.sleep(this.waitForDataTimeout);
            }
        }
        catch (Exception ex) {
            ((CassandraStore)this.store).disconnect();
            DTThrowable.rethrow((Exception)ex);
        }
    }

    protected void emit(T tuple) {
        this.outputPort.emit(tuple);
    }

    public int getFetchSize() {
        return this.fetchSize;
    }

    public void setFetchSize(int fetchSize) {
        this.fetchSize = fetchSize;
    }
}

