package com.datatorrent.contrib.hbase;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.util.Queue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/hbase/HBaseScanOperator.class */
public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> implements Operator.ActivationListener<Context> {
    public static final int DEF_QUEUE_SIZE = 1000;
    public static final int DEF_SLEEP_MILLIS = 10;
    private String startRow;
    private String endRow;
    private String lastReadRow;
    private Queue<Result> resultQueue;
    private volatile boolean running;

    @AutoMetric
    protected long tuplesRead;
    protected transient Scan scan;
    protected transient ResultScanner scanner;
    protected transient Thread readThread;
    private static final Logger logger = LoggerFactory.getLogger(HBaseScanOperator.class);
    private int queueSize = DEF_QUEUE_SIZE;
    private int sleepMillis = 10;
    private transient String threadFailureReason = null;

    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        this.resultQueue = Queues.newLinkedBlockingQueue(this.queueSize);
    }

    public void activate(Context context) {
        startReadThread();
    }

    protected void startReadThread() {
        try {
            this.scan = operationScan();
            this.scanner = ((HBaseStore) getStore()).getTable().getScanner(this.scan);
            this.readThread = new Thread(new Runnable() { // from class: com.datatorrent.contrib.hbase.HBaseScanOperator.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            try {
                                Result next = HBaseScanOperator.this.scanner.next();
                                if (next != null) {
                                    while (HBaseScanOperator.this.running && !HBaseScanOperator.this.resultQueue.offer(next)) {
                                        Thread.sleep(HBaseScanOperator.this.sleepMillis);
                                    }
                                }
                            } catch (Exception e) {
                                HBaseScanOperator.logger.debug("Exception in fetching results {}", e.getMessage());
                                HBaseScanOperator.this.threadFailureReason = e.getMessage();
                                HBaseScanOperator.this.scanner.close();
                                return;
                            }
                        } catch (Throwable th) {
                            HBaseScanOperator.this.scanner.close();
                            throw th;
                        }
                    }
                }
            });
            this.readThread.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void beginWindow(long j) {
        super.beginWindow(j);
        this.tuplesRead = 0L;
        this.running = true;
    }

    public void emitTuples() {
        if (!this.readThread.isAlive()) {
            throw new RuntimeException(this.threadFailureReason);
        }
        try {
            Result poll = this.resultQueue.poll();
            if (poll == null) {
                return;
            }
            T tuple = getTuple(poll);
            if (tuple != null) {
                emitTuple(tuple);
                this.tuplesRead++;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void emitTuple(T t) {
        this.outputPort.emit(t);
    }

    public void endWindow() {
        this.running = false;
        super.endWindow();
    }

    public void deactivate() {
        this.readThread.interrupt();
    }

    protected abstract Scan operationScan();

    protected abstract T getTuple(Result result);

    public String getStartRow() {
        return this.startRow;
    }

    public void setStartRow(String str) {
        this.startRow = str;
    }

    public String getEndRow() {
        return this.endRow;
    }

    public void setEndRow(String str) {
        this.endRow = str;
    }

    public String getLastReadRow() {
        return this.lastReadRow;
    }

    public void setLastReadRow(String str) {
        this.lastReadRow = str;
    }
}
