/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.hbase;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.hbase.HBaseInputOperator;
import com.datatorrent.contrib.hbase.HBaseStore;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.util.Queue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HBaseScanOperator<T>
extends HBaseInputOperator<T>
implements Operator.ActivationListener<Context> {
    public static final int DEF_QUEUE_SIZE = 1000;
    public static final int DEF_SLEEP_MILLIS = 10;
    private String startRow;
    private String endRow;
    private String lastReadRow;
    private int queueSize = 1000;
    private int sleepMillis = 10;
    private Queue<Result> resultQueue;
    private volatile boolean running;
    @AutoMetric
    protected long tuplesRead;
    protected transient Scan scan;
    protected transient ResultScanner scanner;
    protected transient Thread readThread;
    private transient String threadFailureReason = null;
    private static final Logger logger = LoggerFactory.getLogger(HBaseScanOperator.class);

    public void setup(Context.OperatorContext context) {
        super.setup(context);
        this.resultQueue = Queues.newLinkedBlockingQueue((int)this.queueSize);
    }

    public void activate(Context context) {
        this.startReadThread();
    }

    protected void startReadThread() {
        try {
            this.scan = this.operationScan();
            this.scanner = ((HBaseStore)this.getStore()).getTable().getScanner(this.scan);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.readThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Unable to fully structure code
             */
            @Override
            public void run() {
                try {
                    try {
                        block4: while (true) {
                            if ((result = HBaseScanOperator.this.scanner.next()) == null) {
                                continue;
                            }
                            while (true) {
                                if (HBaseScanOperator.access$000(HBaseScanOperator.this) && !HBaseScanOperator.access$100(HBaseScanOperator.this).offer(result)) ** break;
                                continue block4;
                                Thread.sleep(HBaseScanOperator.access$200(HBaseScanOperator.this));
                            }
                            break;
                        }
                    }
                    catch (Exception e) {
                        HBaseScanOperator.access$300().debug("Exception in fetching results {}", (Object)e.getMessage());
                        HBaseScanOperator.access$402(HBaseScanOperator.this, e.getMessage());
                        HBaseScanOperator.this.scanner.close();
                    }
                }
                catch (Throwable var2_3) {
                    HBaseScanOperator.this.scanner.close();
                    throw var2_3;
                }
            }
        });
        this.readThread.start();
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        this.tuplesRead = 0L;
        this.running = true;
    }

    public void emitTuples() {
        if (!this.readThread.isAlive()) {
            throw new RuntimeException(this.threadFailureReason);
        }
        try {
            Result result = this.resultQueue.poll();
            if (result == null) {
                return;
            }
            T tuple = this.getTuple(result);
            if (tuple != null) {
                this.emitTuple(tuple);
                ++this.tuplesRead;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void emitTuple(T tuple) {
        this.outputPort.emit(tuple);
    }

    public void endWindow() {
        this.running = false;
        super.endWindow();
    }

    public void deactivate() {
        this.readThread.interrupt();
    }

    protected abstract Scan operationScan();

    protected abstract T getTuple(Result var1);

    public String getStartRow() {
        return this.startRow;
    }

    public void setStartRow(String startRow) {
        this.startRow = startRow;
    }

    public String getEndRow() {
        return this.endRow;
    }

    public void setEndRow(String endRow) {
        this.endRow = endRow;
    }

    public String getLastReadRow() {
        return this.lastReadRow;
    }

    public void setLastReadRow(String lastReadRow) {
        this.lastReadRow = lastReadRow;
    }

    static /* synthetic */ boolean access$000(HBaseScanOperator x0) {
        return x0.running;
    }

    static /* synthetic */ Queue access$100(HBaseScanOperator x0) {
        return x0.resultQueue;
    }

    static /* synthetic */ int access$200(HBaseScanOperator x0) {
        return x0.sleepMillis;
    }

    static /* synthetic */ Logger access$300() {
        return logger;
    }

    static /* synthetic */ String access$402(HBaseScanOperator x0, String x1) {
        x0.threadFailureReason = x1;
        return x0.threadFailureReason;
    }
}

