/*
 * Decompiled with CFR 0.152.
 */
package com.sleepycat.je.rep.subscription;

import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.log.entry.LNLogEntry;
import com.sleepycat.je.log.entry.LogEntry;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.InputWireRecord;
import com.sleepycat.je.rep.subscription.SubscriptionCallback;
import com.sleepycat.je.rep.subscription.SubscriptionConfig;
import com.sleepycat.je.rep.subscription.SubscriptionStat;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.VLSN;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

class SubscriptionProcessMessageThread
extends StoppableThread {
    private final SubscriptionStat stats;
    private final SubscriptionConfig config;
    private final BlockingQueue<Object> queue;
    private final Logger logger;
    private volatile ExitType exitRequest;

    SubscriptionProcessMessageThread(RepImpl impl, BlockingQueue<Object> queue, SubscriptionConfig config, SubscriptionStat stats, Logger logger) {
        super(impl, "SubscriptionProcessMessageThread");
        this.logger = logger;
        this.config = config;
        this.queue = queue;
        this.stats = stats;
        this.exitRequest = ExitType.NONE;
        stats.setHighVLSN(VLSN.NULL_VLSN);
    }

    public void shutdown() {
        this.exitRequest = ExitType.IMMEDIATE;
    }

    @Override
    public int initiateSoftShutdown() {
        this.exitRequest = ExitType.IMMEDIATE;
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        SubscriptionCallback callBack = this.config.getCallBack();
        this.logger.info("Input thread started. Message queue size:" + this.queue.remainingCapacity());
        try {
            while (this.exitRequest != ExitType.IMMEDIATE) {
                Object message = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                if (message == null) continue;
                if (message instanceof Exception) {
                    callBack.processException((Exception)message);
                    if (!(message instanceof GroupShutdownException)) continue;
                    this.exitRequest = ExitType.IMMEDIATE;
                    GroupShutdownException gse = (GroupShutdownException)message;
                    this.logger.info("Received shutdown message from " + this.config.getFeederHost() + " at VLSN " + gse.getShutdownVLSN());
                    break;
                }
                InputWireRecord wireRecord = ((BaseProtocol.Entry)message).getWireRecord();
                VLSN vlsn = wireRecord.getVLSN();
                byte type = wireRecord.getEntryType();
                LogEntry entry = wireRecord.getLogEntry();
                long txnId = entry.getTransactionId();
                this.stats.setHighVLSN(vlsn);
                this.stats.getNumOpsProcessed().increment();
                if (LogEntryType.LOG_TXN_COMMIT.equalsType(type)) {
                    this.stats.getNumTxnCommitted().increment();
                    callBack.processCommit(vlsn, txnId);
                    continue;
                }
                if (LogEntryType.LOG_TXN_ABORT.equalsType(type)) {
                    this.stats.getNumTxnAborted().increment();
                    callBack.processAbort(vlsn, txnId);
                    continue;
                }
                if (!(entry instanceof LNLogEntry)) continue;
                LNLogEntry lnEntry = (LNLogEntry)entry;
                lnEntry.postFetchInit(false);
                if (lnEntry.getLN().isDeleted()) {
                    callBack.processDel(vlsn, lnEntry.getKey(), txnId);
                    continue;
                }
                callBack.processPut(vlsn, lnEntry.getKey(), lnEntry.getData(), txnId);
            }
        }
        catch (InterruptedException e) {
            this.logger.warning("input thread receives exception " + e.getMessage() + ", process the exception in callback, clear queue and exit.\n" + LoggerUtils.getStackTrace(e));
            this.exitRequest = ExitType.IMMEDIATE;
        }
        finally {
            this.queue.clear();
            this.logger.info("message queue cleared, thread exits with type: " + (Object)((Object)this.exitRequest));
        }
    }

    @Override
    protected Logger getLogger() {
        return this.logger;
    }

    private static enum ExitType {
        NONE,
        IMMEDIATE,
        SOFT;

    }
}

