/*
 * Decompiled with CFR 0.152.
 */
package com.sleepycat.je.rep.arbiter.impl;

import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Durability;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.dbi.DbConfigManager;
import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.log.entry.LogEntry;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.arbiter.impl.ArbiterImpl;
import com.sleepycat.je.rep.arbiter.impl.ArbiterOutputThread;
import com.sleepycat.je.rep.arbiter.impl.ArbiterStatDefinition;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.ReplicaOutputThread;
import com.sleepycat.je.rep.impl.node.ReplicaOutputThreadBase;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.net.DataChannelFactory;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.InputWireRecord;
import com.sleepycat.je.rep.stream.MasterStatus;
import com.sleepycat.je.rep.stream.Protocol;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshake;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig;
import com.sleepycat.je.rep.utilint.BinaryProtocol;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.txn.TxnCommit;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.LongStat;
import com.sleepycat.je.utilint.StatGroup;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.StringStat;
import com.sleepycat.je.utilint.VLSN;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

class ArbiterAcker {
    private static final int NETWORK_RETRIES = 2;
    private static final int SERVICE_UNAVAILABLE_RETRIES = 10;
    private static final int CONNECT_RETRY_SLEEP_MS = 1000;
    private static final long QUEUE_POLL_INTERVAL_NS = 1000000000L;
    private Exception shutdownException = null;
    private final RepImpl repImpl;
    private final Logger logger;
    private NamedChannelWithTimeout arbiterFeederChannel;
    private final RepUtils.Clock clock;
    private Protocol protocol;
    private final ArbiterImpl arbiterImpl;
    private final BlockingQueue<Long> outputQueue;
    private final BlockingQueue<BinaryProtocol.Message> requestQueue;
    private ArbiterOutputThread arbiterOutputThread;
    private RequestThread requestThread;
    private volatile VLSN lastReplayedVLSN = null;
    private long dtvlsn = -1L;
    private final StatGroup stats;
    private final LongStat nReplayQueueOverflow;
    private final LongStat nAcks;
    private final StringStat masterStat;
    private final int N_MAX_GROUP_XACT = 100;
    private final List<BinaryProtocol.Message> groupMessages = new ArrayList<BinaryProtocol.Message>();
    private final List<Long> groupXact = new ArrayList<Long>();
    private final long FSYNC_INTERVAL = 1000L;
    private long lastFSyncTime;

    ArbiterAcker(ArbiterImpl arbiterImpl, RepImpl repImpl) {
        this.arbiterImpl = arbiterImpl;
        this.repImpl = repImpl;
        this.logger = repImpl.getLogger();
        this.clock = new RepUtils.Clock(RepImpl.getClockSkewMs());
        int requestQueueSize = repImpl.getConfigManager().getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE);
        this.requestQueue = new ArrayBlockingQueue<BinaryProtocol.Message>(requestQueueSize);
        int outputQueueSize = 2 * repImpl.getConfigManager().getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE);
        this.outputQueue = new ArrayBlockingQueue<Long>(outputQueueSize);
        this.stats = new StatGroup("Arbiter", "Arbiter statistics");
        this.nReplayQueueOverflow = new LongStat(this.stats, ArbiterStatDefinition.ARB_N_REPLAY_QUEUE_OVERFLOW);
        this.nAcks = new LongStat(this.stats, ArbiterStatDefinition.ARB_N_ACKS);
        this.masterStat = new StringStat(this.stats, ArbiterStatDefinition.ARB_MASTER);
    }

    private void initializeConnection() throws ConnectRetryException, IOException {
        this.createArbiterFeederChannel();
        this.arbiterImpl.refreshCachedGroup();
        ReplicaFeederHandshake handshake = new ReplicaFeederHandshake(new RepFeederHandshakeConfig());
        this.protocol = handshake.execute();
        this.arbiterImpl.refreshCachedGroup();
        this.protocol.read(this.arbiterFeederChannel.getChannel(), BaseProtocol.Heartbeat.class);
        this.queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
        this.arbiterImpl.getReadyLatch().countDown();
        this.arbiterImpl.notifyJoinGroup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void runArbiterAckLoop() throws InterruptedException, DatabaseException, GroupShutdownException {
        Class<?> retryExceptionClass = null;
        int retryCount = 0;
        try {
            while (true) {
                try {
                    this.runArbiterAckLoopInternal();
                }
                catch (RetryException e) {
                    if (!this.arbiterImpl.getMasterStatus().inSync()) {
                        LoggerUtils.fine(this.logger, this.repImpl, "Retry terminated, out of sync.");
                        break;
                    }
                    if (e.getClass() == retryExceptionClass || e.retries == 0) {
                        if (++retryCount >= e.retries) {
                            LoggerUtils.info(this.logger, this.repImpl, "Failed to recover from exception: " + e.getMessage() + ", despite " + e.retries + " retries.\n" + LoggerUtils.getStackTrace(e));
                            break;
                        }
                    } else {
                        retryCount = 0;
                        retryExceptionClass = e.getClass();
                    }
                    LoggerUtils.fine(this.logger, this.repImpl, "Retry #: " + retryCount + "/" + e.retries + " Will retry Arbiter loop after " + e.retrySleepMs + "ms. ");
                    Thread.sleep(e.retrySleepMs);
                    if (this.arbiterImpl.getMasterStatus().inSync()) continue;
                }
                break;
            }
        }
        finally {
            this.arbiterImpl.resetReadyLatch(this.shutdownException);
        }
    }

    void shutdown() {
        if (this.requestThread != null) {
            try {
                this.requestThread.shutdownThread(this.logger);
            }
            catch (Exception e) {
                LoggerUtils.info(this.logger, this.repImpl, "Request thread error shutting down." + e);
            }
        }
        if (this.arbiterOutputThread != null) {
            this.arbiterOutputThread.shutdownThread(this.logger);
            try {
                this.arbiterOutputThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        RepUtils.shutdownChannel(this.arbiterFeederChannel);
    }

    private void runArbiterAckLoopInternal() throws InterruptedException, RetryException {
        this.shutdownException = null;
        LoggerUtils.info(this.logger, this.repImpl, "Arbiter loop started with master: " + this.arbiterImpl.getMasterStatus().getNodeMasterNameId());
        try {
            this.initializeConnection();
            this.arbiterImpl.setState(ReplicatedEnvironment.State.REPLICA);
            this.doRunArbiterLoopInternalWork();
            this.arbiterImpl.setState(ReplicatedEnvironment.State.UNKNOWN);
        }
        catch (ClosedByInterruptException closedByInterruptException) {
            if (this.arbiterImpl.isShutdown()) {
                LoggerUtils.info(this.logger, this.repImpl, "Arbiter loop interrupted for shutdown.");
                return;
            }
            LoggerUtils.warning(this.logger, this.repImpl, "Arbiter loop unexpected interrupt.");
            throw new InterruptedException(closedByInterruptException.getMessage());
        }
        catch (UnknownMasterException | IOException e) {
            LoggerUtils.fine(this.logger, this.repImpl, "Arbiter exception: " + e.getMessage() + "\n" + LoggerUtils.getStackTrace(e));
        }
        catch (RetryException e) {
            throw e;
        }
        catch (GroupShutdownException e) {
            this.shutdownException = e;
            throw e;
        }
        catch (RuntimeException e) {
            this.shutdownException = e;
            LoggerUtils.severe(this.logger, this.repImpl, "Arbiter unexpected exception " + e + " " + LoggerUtils.getStackTrace(e));
            throw e;
        }
        catch (MasterStatus.MasterSyncException e) {
            LoggerUtils.fine(this.logger, this.repImpl, e.getMessage());
        }
        catch (Exception e) {
            this.shutdownException = e;
            LoggerUtils.severe(this.logger, this.repImpl, "Arbiter unexpected exception " + e + " " + LoggerUtils.getStackTrace(e));
            throw EnvironmentFailureException.unexpectedException(e);
        }
        finally {
            this.loopExitCleanup();
        }
    }

    /*
     * Exception decompiling
     */
    protected void doRunArbiterLoopInternalWork() throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [18[WHILELOOP]], but top level block is 5[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    StatGroup loadStats(StatsConfig config) throws DatabaseException {
        this.masterStat.set(this.arbiterImpl.getMasterStatus().getNodeMasterNameId().toString());
        StatGroup copyStats = this.stats.cloneGroup(config.getClear());
        return copyStats;
    }

    private void loopExitCleanup() {
        if (this.shutdownException != null) {
            if (this.shutdownException instanceof RetryException) {
                LoggerUtils.fine(this.logger, this.repImpl, "Retrying connection to feeder. Message: " + this.shutdownException.getMessage());
            } else if (this.shutdownException instanceof GroupShutdownException) {
                LoggerUtils.info(this.logger, this.repImpl, "Exiting inner Arbiter loop. Master requested shutdown.");
            } else {
                LoggerUtils.warning(this.logger, this.repImpl, "Exiting inner Arbiter loop with exception " + this.shutdownException + "\n" + LoggerUtils.getStackTrace(this.shutdownException));
            }
        } else {
            LoggerUtils.fine(this.logger, this.repImpl, "Exiting inner Arbiter loop.");
        }
        this.shutdown();
    }

    private void createArbiterFeederChannel() throws IOException, ConnectRetryException {
        DataChannel dataChannel = null;
        DbConfigManager configManager = this.repImpl.getConfigManager();
        int timeoutMs = configManager.getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
        int receiveBufferSize = configManager.getInt(RepParams.REPLICA_RECEIVE_BUFFER_SIZE);
        try {
            int openTimeout = configManager.getDuration(RepParams.REPSTREAM_OPEN_TIMEOUT);
            DataChannelFactory.ConnectOptions connectOpts = new DataChannelFactory.ConnectOptions().setTcpNoDelay(true).setReceiveBufferSize(receiveBufferSize).setOpenTimeout(openTimeout).setBlocking(true);
            dataChannel = this.repImpl.getChannelFactory().connect(this.arbiterImpl.getMasterStatus().getNodeMaster(), connectOpts);
            this.arbiterFeederChannel = new NamedChannelWithTimeout(this.repImpl, this.logger, this.arbiterImpl.getChannelTimeoutTask(), dataChannel, timeoutMs);
            ServiceDispatcher.doServiceHandshake(dataChannel, "Feeder");
        }
        catch (ConnectException e) {
            throw new ConnectRetryException(e.getMessage(), 2, 1000);
        }
        catch (ServiceDispatcher.ServiceConnectFailedException e) {
            if (e.getResponse() == ServiceDispatcher.Response.UNKNOWN_SERVICE) {
                throw new ConnectRetryException(e.getMessage(), 10, 1000);
            }
            throw EnvironmentFailureException.unexpectedException(e);
        }
    }

    private void queueAck(Long xid) throws IOException {
        try {
            this.outputQueue.put(xid);
        }
        catch (InterruptedException ie) {
            throw new IOException("Ack I/O interrupted", ie);
        }
    }

    private GroupShutdownException processShutdown(BaseProtocol.ShutdownRequest shutdown) throws IOException {
        this.queueAck(ReplicaOutputThreadBase.SHUTDOWN_ACK);
        this.arbiterFeederChannel.setTimeoutMs(Integer.MAX_VALUE);
        String masterHostName = this.arbiterImpl.getMasterStatus().getGroupMaster().getHostName();
        return new GroupShutdownException(this.logger, this.repImpl, masterHostName, this.arbiterImpl.getArbiterVLSNTracker().get(), shutdown.getShutdownTimeMs());
    }

    private BinaryProtocol.Message replayEntries(BinaryProtocol.Message firstMessage) throws IOException {
        int i;
        boolean doSync = false;
        long highVLSN = 0L;
        BinaryProtocol.Message shutdownMessage = null;
        this.groupXact.clear();
        this.groupMessages.clear();
        this.groupMessages.add(firstMessage);
        this.requestQueue.drainTo(this.groupMessages, 100);
        for (i = 0; i < this.groupMessages.size(); ++i) {
            BinaryProtocol.Message message = this.groupMessages.get(i);
            BinaryProtocol.MessageOp messageOp = message.getOp();
            if (messageOp == Protocol.SHUTDOWN_REQUEST) {
                shutdownMessage = message;
                continue;
            }
            if (messageOp == Protocol.HEARTBEAT) {
                this.groupXact.add(ReplicaOutputThreadBase.HEARTBEAT_ACK);
                continue;
            }
            InputWireRecord wireRecord = ((BaseProtocol.Entry)message).getWireRecord();
            byte entryType = wireRecord.getEntryType();
            this.lastReplayedVLSN = wireRecord.getVLSN();
            if (LogEntryType.LOG_TXN_COMMIT.equalsType(entryType)) {
                TxnCommit masterCommit;
                long nextDTVLSN;
                BaseProtocol.Commit commitEntry = (BaseProtocol.Commit)message;
                if (commitEntry.getReplicaSyncPolicy() == Durability.SyncPolicy.SYNC) {
                    doSync = true;
                }
                LogEntry logEntry = wireRecord.getLogEntry();
                if (this.lastReplayedVLSN.getSequence() > highVLSN) {
                    highVLSN = this.lastReplayedVLSN.getSequence();
                }
                if ((nextDTVLSN = (masterCommit = (TxnCommit)logEntry.getMainItem()).getDTVLSN()) == 0L) {
                    nextDTVLSN = wireRecord.getVLSN().getSequence();
                }
                this.dtvlsn = nextDTVLSN > this.dtvlsn ? nextDTVLSN : this.dtvlsn;
                this.groupXact.add(logEntry.getTransactionId());
                this.nAcks.increment();
                if (!this.logger.isLoggable(Level.FINEST)) continue;
                LoggerUtils.finest(this.logger, this.repImpl, "Arbiter ack commit record " + wireRecord);
                continue;
            }
            String errMsg = "Illegal message type recieved by  Arbiter. [" + wireRecord + "]";
            throw new IllegalStateException(errMsg);
        }
        if (doSync || this.lastFSyncTime + 1000L <= System.currentTimeMillis()) {
            doSync = true;
            this.lastFSyncTime = System.currentTimeMillis();
        }
        this.arbiterImpl.getArbiterVLSNTracker().write(new VLSN(highVLSN), new VLSN(this.dtvlsn), this.arbiterImpl.getMasterStatus().getGroupMasterNameId().getId(), doSync);
        for (i = 0; i < this.groupXact.size(); ++i) {
            this.queueAck(this.groupXact.get(i));
        }
        return shutdownMessage;
    }

    private class RepFeederHandshakeConfig
    implements ReplicaFeederHandshakeConfig {
        private RepFeederHandshakeConfig() {
        }

        @Override
        public RepImpl getRepImpl() {
            return ArbiterAcker.this.repImpl;
        }

        @Override
        public NameIdPair getNameIdPair() {
            return ArbiterAcker.this.arbiterImpl.getNameIdPair();
        }

        @Override
        public RepUtils.Clock getClock() {
            return ArbiterAcker.this.clock;
        }

        @Override
        public NodeType getNodeType() {
            return NodeType.ARBITER;
        }

        @Override
        public RepGroupImpl getGroup() {
            return ArbiterAcker.this.arbiterImpl.getGroup();
        }

        @Override
        public NamedChannel getNamedChannel() {
            return ArbiterAcker.this.arbiterFeederChannel;
        }
    }

    class RequestThread
    extends StoppableThread {
        private volatile Exception exception;
        volatile RequestExitType exitRequest;
        private static final long REQUEST_QUEUE_POLL_INTERVAL_NS = 1000000000L;

        protected RequestThread() {
            super(ArbiterAcker.this.repImpl, "RequestThread");
            this.exitRequest = null;
        }

        @Override
        protected int initiateSoftShutdown() {
            this.exitRequest = RequestExitType.IMMEDIATE;
            return 0;
        }

        @Override
        public void run() {
            LoggerUtils.fine(ArbiterAcker.this.logger, ArbiterAcker.this.repImpl, "Request thread started. Message queue size:" + ArbiterAcker.this.requestQueue.remainingCapacity());
            try {
                BinaryProtocol.Message shutdownMessage;
                BinaryProtocol.Message message;
                do {
                    message = (BinaryProtocol.Message)ArbiterAcker.this.requestQueue.poll(1000000000L, TimeUnit.NANOSECONDS);
                    if (this.exitRequest == RequestExitType.IMMEDIATE || this.exitRequest == RequestExitType.SOFT && message == null || ArbiterAcker.this.arbiterImpl.isShutdownOrInvalid()) {
                        return;
                    }
                    ArbiterAcker.this.arbiterImpl.getMasterStatus().assertSync();
                } while (message == null || (shutdownMessage = ArbiterAcker.this.replayEntries(message)) == null);
                throw ArbiterAcker.this.processShutdown((BaseProtocol.ShutdownRequest)shutdownMessage);
            }
            catch (Exception e) {
                this.exception = e;
                ArbiterAcker.this.requestQueue.clear();
                LoggerUtils.fine(ArbiterAcker.this.logger, ArbiterAcker.this.repImpl, "closing arbiterFeederChannel = " + ArbiterAcker.this.arbiterFeederChannel);
                RepUtils.shutdownChannel(ArbiterAcker.this.arbiterFeederChannel);
                LoggerUtils.info(ArbiterAcker.this.logger, ArbiterAcker.this.repImpl, "ArbiterAcker thread exiting with exception:" + e.getMessage());
                return;
            }
        }

        @Override
        protected Logger getLogger() {
            return ArbiterAcker.this.logger;
        }

        static /* synthetic */ Exception access$100(RequestThread x0) {
            return x0.exception;
        }
    }

    static class ConnectRetryException
    extends RetryException {
        ConnectRetryException(String message, int retries, int retrySleepMs) {
            super(message, retries, retrySleepMs);
        }
    }

    static abstract class RetryException
    extends Exception {
        final int retries;
        final int retrySleepMs;

        RetryException(String message, int retries, int retrySleepMs) {
            super(message);
            this.retries = retries;
            this.retrySleepMs = retrySleepMs;
        }

        @Override
        public String getMessage() {
            return "Failed after retries: " + this.retries + " with retry interval: " + this.retrySleepMs + "ms.";
        }
    }

    private static enum RequestExitType {
        IMMEDIATE,
        SOFT;

    }
}

