package com.datatorrent.contrib.aerospike;

import com.aerospike.client.Record;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/aerospike/AbstractAerospikeGetOperator.class */
public abstract class AbstractAerospikeGetOperator<T> extends AbstractStoreInputOperator<T, AerospikeStore> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractAerospikeGetOperator.class);
    public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>();

    public abstract T getTuple(Record record);

    public abstract Statement queryToRetrieveData();

    public void emitTuples() {
        Statement queryToRetrieveData = queryToRetrieveData();
        logger.debug(String.format("select statement: %s", queryToRetrieveData.toString()));
        try {
            RecordSet query = ((AerospikeStore) this.store).getClient().query((QueryPolicy) null, queryToRetrieveData);
            while (query.next()) {
                this.outputPort.emit(getTuple(query.getRecord()));
            }
        } catch (Exception e) {
            ((AerospikeStore) this.store).disconnect();
            DTThrowable.rethrow(e);
        }
    }
}
