/*
 * Decompiled with CFR 0.152.
 */
package com.sleepycat.je.rep.subscription;

import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.rep.GroupShutdownException;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationSecurityException;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.RepParams;
import com.sleepycat.je.rep.impl.node.ChannelTimeoutTask;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.ReplicaOutputThread;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.net.DataChannelFactory;
import com.sleepycat.je.rep.stream.BaseProtocol;
import com.sleepycat.je.rep.stream.Protocol;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshake;
import com.sleepycat.je.rep.stream.ReplicaFeederHandshakeConfig;
import com.sleepycat.je.rep.stream.SubscriberFeederSyncup;
import com.sleepycat.je.rep.subscription.SubscriptionConfig;
import com.sleepycat.je.rep.subscription.SubscriptionOutputThread;
import com.sleepycat.je.rep.subscription.SubscriptionProcessMessageThread;
import com.sleepycat.je.rep.subscription.SubscriptionStat;
import com.sleepycat.je.rep.subscription.SubscriptionStatus;
import com.sleepycat.je.rep.utilint.BinaryProtocol;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.utilint.InternalException;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StoppableThread;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.VLSN;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

class SubscriptionThread
extends StoppableThread {
    private static final int SOFT_SHUTDOWN_WAIT_MS = 5000;
    private final Logger logger;
    private final SubscriptionConfig config;
    private final SubscriptionStat stats;
    private final BlockingQueue<Long> outputQueue;
    private final BlockingQueue<Object> inputQueue;
    private SubscriptionProcessMessageThread messageProcThread;
    private NamedChannelWithTimeout namedChannel;
    private ChannelTimeoutTask channelTimeoutTask;
    private Protocol protocol;
    private final VLSN reqVLSN;
    private volatile SubscriptionOutputThread outputThread;
    private volatile SubscriptionStatus status;
    private volatile Exception storedException;
    private TestHook<SubscriptionThread> exceptionHandlingTestHook;

    SubscriptionThread(ReplicatedEnvironment env, VLSN reqVLSN, SubscriptionConfig config, SubscriptionStat stats, Logger logger) {
        super(RepInternal.getNonNullRepImpl(env), "Subscription Main");
        this.setUncaughtExceptionHandler(new SubscriptionThreadExceptionHandler());
        this.reqVLSN = reqVLSN;
        this.config = config;
        this.stats = stats;
        this.logger = logger;
        this.protocol = null;
        this.namedChannel = null;
        this.inputQueue = new ArrayBlockingQueue<Object>(config.getInputMessageQueueSize());
        this.outputQueue = new ArrayBlockingQueue<Long>(config.getOutputMessageQueueSize());
        this.status = SubscriptionStatus.INIT;
        this.storedException = null;
        this.exceptionHandlingTestHook = null;
    }

    public SubscriptionStatus getStatus() {
        return this.status;
    }

    public Exception getStoredException() {
        return this.storedException;
    }

    @Override
    protected Logger getLogger() {
        return this.logger;
    }

    @Override
    protected int initiateSoftShutdown() {
        return 5000;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LoggerUtils.info(this.logger, this.envImpl, "Start subscription from VLSN " + this.reqVLSN + " from feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
        try {
            int maxRetry = this.config.getMaxConnectRetries();
            boolean auxThreadCreated = false;
            int numRetry = 0;
            while (!this.isShutdown()) {
                try {
                    this.initializeConnection();
                    if (!auxThreadCreated) {
                        LoggerUtils.fine(this.logger, this.envImpl, "Create auxiliary msg processing and output threads");
                        auxThreadCreated = this.createAuxThread();
                        if (auxThreadCreated) {
                            this.status = SubscriptionStatus.SUCCESS;
                            this.loopInternal();
                        } else {
                            this.status = SubscriptionStatus.UNKNOWN_ERROR;
                        }
                    }
                    break;
                }
                catch (ConnectionException e) {
                    if (numRetry == maxRetry) {
                        LoggerUtils.info(this.logger, this.envImpl, "Reaching the max retry " + maxRetry + " to connect feeder " + this.config.getFeederHost() + ", shut down subscription\n" + LoggerUtils.getStackTrace(e));
                        this.storedException = e;
                        this.status = SubscriptionStatus.CONNECTION_ERROR;
                        break;
                    }
                    ++numRetry;
                    LoggerUtils.fine(this.logger, this.envImpl, "Fail to connect feeder at " + this.config.getFeederHost() + " sleep for " + e.getRetrySleepMs() + " ms and re-connect again");
                    Thread.sleep(e.getRetrySleepMs());
                }
            }
        }
        catch (ReplicationSecurityException ure) {
            this.storedException = ure;
            LoggerUtils.warning(this.logger, this.envImpl, "Subscription exited due to security check failure: " + ure.getMessage());
            this.status = SubscriptionStatus.SECURITY_CHECK_ERROR;
        }
        catch (GroupShutdownException e) {
            if (this.messageProcThread.isAlive()) {
                try {
                    this.messageProcThread.join();
                }
                catch (InterruptedException ie) {
                    LoggerUtils.fine(this.logger, this.envImpl, "exception in shutting down msg proc thread " + ie.getMessage() + "\n" + LoggerUtils.getStackTrace(ie));
                }
            }
            this.storedException = e;
            LoggerUtils.info(this.logger, this.envImpl, "received group shutdown " + e.getMessage() + "\n" + LoggerUtils.getStackTrace(e));
            this.status = SubscriptionStatus.GRP_SHUTDOWN;
        }
        catch (InsufficientLogException e) {
            this.storedException = e;
            LoggerUtils.info(this.logger, this.envImpl, "unable to subscribe from requested VLSN " + this.reqVLSN + "\n" + LoggerUtils.getStackTrace(e));
            this.status = SubscriptionStatus.VLSN_NOT_AVAILABLE;
        }
        catch (EnvironmentFailureException e) {
            this.storedException = e;
            LoggerUtils.warning(this.logger, this.envImpl, "unable to sync up with feeder due to EFE " + e.getMessage() + "\n" + LoggerUtils.getStackTrace(e));
            this.status = SubscriptionStatus.UNKNOWN_ERROR;
        }
        catch (InterruptedException e) {
            this.storedException = e;
            LoggerUtils.warning(this.logger, this.envImpl, "interrupted exception " + e.getMessage() + "\n" + LoggerUtils.getStackTrace(e));
            this.status = SubscriptionStatus.UNKNOWN_ERROR;
        }
        catch (InternalException e) {
            this.storedException = e;
            LoggerUtils.warning(this.logger, this.envImpl, "internal exception " + e.getMessage() + "\n" + LoggerUtils.getStackTrace(e));
            this.status = SubscriptionStatus.UNKNOWN_ERROR;
        }
        finally {
            this.shutdown();
        }
    }

    void setExceptionHandlingTestHook(TestHook<SubscriptionThread> exceptionHandlingTestHook) {
        this.exceptionHandlingTestHook = exceptionHandlingTestHook;
    }

    void setStatus(SubscriptionStatus s) {
        this.status = s;
    }

    void shutdown() {
        if (this.shutdownDone(this.logger)) {
            return;
        }
        if (this.messageProcThread != null) {
            try {
                this.messageProcThread.shutdownThread(this.logger);
                LoggerUtils.info(this.logger, this.envImpl, "message processing thread has shut down.");
            }
            catch (Exception e) {
                LoggerUtils.warning(this.logger, this.envImpl, "error in shutdown msg proc thread: " + e.getMessage() + ", continue shutdown the subscription thread.");
            }
            finally {
                this.messageProcThread = null;
            }
        }
        if (this.outputThread != null) {
            try {
                this.outputThread.shutdownThread(this.logger);
                LoggerUtils.info(this.logger, this.envImpl, "output thread has shut down.");
            }
            catch (Exception e) {
                LoggerUtils.warning(this.logger, this.envImpl, "error in shutdown output thread: " + e.getMessage() + ", continue shutdown subscription thread.");
            }
            finally {
                this.outputThread = null;
            }
        }
        this.inputQueue.clear();
        this.outputQueue.clear();
        RepUtils.shutdownChannel(this.namedChannel);
        if (this.channelTimeoutTask != null) {
            this.channelTimeoutTask.cancel();
        }
        this.shutdownThread(this.logger);
        LoggerUtils.info(this.logger, this.envImpl, "queues cleared and channel closed, subscription thread has completely shut down");
    }

    void offer(Object message) throws InterruptedException, InternalException {
        if (this.isShutdown()) {
            return;
        }
        RepImpl repImpl = (RepImpl)this.envImpl;
        while (!this.inputQueue.offer(message, 1000L, TimeUnit.MILLISECONDS)) {
            if (this.isShutdown()) {
                return;
            }
            if (!this.messageProcThread.isAlive()) {
                String err = "Thread consuming input queue is gone, start shutdown process";
                LoggerUtils.warning(this.logger, repImpl, "Thread consuming input queue is gone, start shutdown process");
                throw new InternalException("Thread consuming input queue is gone, start shutdown process");
            }
            this.stats.getNumReplayQueueOverflow().increment();
        }
    }

    private void initializeConnection() throws InternalException, EnvironmentFailureException, ConnectionException, ReplicationSecurityException {
        LoggerUtils.fine(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " start open channel and handshake with feeder");
        try {
            this.openChannel();
            ReplicaFeederHandshake handshake = new ReplicaFeederHandshake(new SubFeederHandshakeConfig(this.config.getNodeType()));
            this.protocol = handshake.execute();
            int minReqVersion = this.config.getMinProtocolVersion();
            if (this.protocol.getVersion() < minReqVersion) {
                throw new BinaryProtocol.ProtocolException("HA protocol version (" + this.protocol.getVersion() + ") is lower than minimal required version (" + minReqVersion + ")");
            }
            LoggerUtils.fine(this.logger, this.envImpl, "subscription " + this.config.getSubNodeName() + " sync-up with feeder at vlsn: " + this.reqVLSN);
            SubscriberFeederSyncup syncup = new SubscriberFeederSyncup(this.namedChannel, this.protocol, this.config.getFeederFilter(), (RepImpl)this.envImpl, this.config.getStreamMode(), this.config.getPartGenDBName(), this.logger);
            VLSN startVLSN = syncup.execute(this.reqVLSN);
            this.stats.setPartGenDBId(syncup.getPartGenDBId());
            LoggerUtils.fine(this.logger, this.envImpl, "sync-up with feeder done, start vlsn: " + startVLSN + ", partition generation db id " + this.stats.getPartGenDBId() + " for given db name " + this.config.getPartGenDBName());
            if (startVLSN.equals(VLSN.NULL_VLSN)) {
                throw new InsufficientLogException((RepImpl)this.envImpl, this.reqVLSN);
            }
            this.stats.setStartVLSN(startVLSN);
            this.protocol.read(this.namedChannel.getChannel(), BaseProtocol.Heartbeat.class);
            this.queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
            LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " successfully connect to feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort() + ", reqVLSN: " + this.reqVLSN + ", start VLSN: " + startVLSN);
        }
        catch (IOException e) {
            throw new ConnectionException("Unable to connect due to " + e.getMessage() + ",  will retry later.", this.config.getSleepBeforeRetryMs(), e);
        }
        catch (EnvironmentFailureException e) {
            this.logger.warning("Fail to handshake with feeder: " + e.getMessage());
            throw e;
        }
        catch (BinaryProtocol.ProtocolException e) {
            String msg = "Unable to connect to feeder " + this.config.getFeederHost() + " due to protocol exception " + e.getMessage();
            LoggerUtils.warning(this.logger, this.envImpl, msg);
            throw new InternalException(msg, e);
        }
    }

    private boolean createAuxThread() {
        RepImpl repImpl = (RepImpl)this.envImpl;
        this.inputQueue.clear();
        this.outputQueue.clear();
        SubscriptionOutputThread cachedOutputThread = this.outputThread = new SubscriptionOutputThread(this, repImpl, this.outputQueue, this.protocol, this.namedChannel.getChannel(), this.config.getAuthenticator(), this.stats);
        if (cachedOutputThread != null) {
            cachedOutputThread.start();
            LoggerUtils.fine(this.logger, this.envImpl, "output thread created for subscription " + this.config.getSubNodeName());
            this.messageProcThread = new SubscriptionProcessMessageThread(repImpl, this.inputQueue, this.config, this.stats, this.logger);
            this.messageProcThread.start();
            LoggerUtils.fine(this.logger, this.envImpl, "message processing thread created for subscription " + this.config.getSubNodeName());
            return true;
        }
        LoggerUtils.info(this.logger, this.envImpl, "subscription " + this.config.getSubNodeName() + " just shut down, no need to create auxiliary threads");
        return false;
    }

    private NamedChannel openChannel() throws ConnectionException, InternalException, ReplicationSecurityException {
        RepImpl repImpl = (RepImpl)this.envImpl;
        if (repImpl == null) {
            throw new IllegalStateException("Replication env is unavailable.");
        }
        try {
            DataChannelFactory.ConnectOptions connectOptions = new DataChannelFactory.ConnectOptions();
            this.config.getClass();
            DataChannelFactory.ConnectOptions connectOptions2 = connectOptions.setTcpNoDelay(true).setReceiveBufferSize(this.config.getReceiveBufferSize()).setOpenTimeout((int)this.config.getStreamOpenTimeout(TimeUnit.MILLISECONDS));
            this.config.getClass();
            DataChannelFactory.ConnectOptions connectOpts = connectOptions2.setBlocking(true);
            DataChannel channel = repImpl.getChannelFactory().connect(this.config.getInetSocketAddress(), connectOpts);
            ServiceDispatcher.doServiceHandshake(channel, "Feeder", this.config.getAuthInfo());
            LoggerUtils.fine(this.logger, this.envImpl, "channel opened to service Feeder@" + this.config.getFeederHost() + "[address: " + this.config.getFeederHostAddr() + " port: " + this.config.getFeederPort() + "]");
            int timeoutMs = repImpl.getConfigManager().getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
            this.channelTimeoutTask = new ChannelTimeoutTask(new Timer(true));
            this.namedChannel = new NamedChannelWithTimeout(repImpl, this.logger, this.channelTimeoutTask, channel, timeoutMs);
        }
        catch (IOException cause) {
            throw new ConnectionException("Fail to open channel to feeder due to " + cause.getMessage() + ", will retry later", this.config.getSleepBeforeRetryMs(), cause);
        }
        catch (ServiceDispatcher.ServiceConnectFailedException cause) {
            if (cause.getResponse() == ServiceDispatcher.Response.UNKNOWN_SERVICE) {
                throw new ConnectionException("Service exception: " + cause.getMessage() + ", wait longer and will retry later", this.config.getSleepBeforeRetryMs(), cause);
            }
            if (cause.getResponse() == ServiceDispatcher.Response.INVALID) {
                throw new ReplicationSecurityException("Security check failure:" + cause.getMessage(), this.config.getSubNodeName(), cause);
            }
            throw new InternalException("Subscription " + this.config.getSubNodeName() + "failed to handshake for service " + "Feeder" + " with feeder " + this.config.getFeederHost(), cause);
        }
        LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " has successfully created a channel to feeder at " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
        return this.namedChannel;
    }

    private void loopInternal() throws InternalException, GroupShutdownException, ReplicationSecurityException {
        RepImpl repImpl = (RepImpl)this.envImpl;
        try {
            LoggerUtils.info(this.logger, this.envImpl, "Start reading messages from feeder " + this.config.getFeederHost() + ":" + this.config.getFeederPort());
            while (!this.isShutdown()) {
                this.checkOutputThread();
                BinaryProtocol.Message message = this.protocol.read(this.namedChannel);
                if (message == null) {
                    LoggerUtils.info(this.logger, this.envImpl, "Subscription " + this.config.getSubNodeName() + " has nothing stream, exit loop.");
                    return;
                }
                assert (TestHookExecute.doHookIfSet(this.exceptionHandlingTestHook, this));
                this.stats.getNumMsgReceived().increment();
                BinaryProtocol.MessageOp messageOp = message.getOp();
                if (messageOp == Protocol.HEARTBEAT) {
                    LoggerUtils.finest(this.logger, this.envImpl, "receive heartbeat from " + this.namedChannel.getNameIdPair());
                    this.queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
                    continue;
                }
                if (messageOp == Protocol.SHUTDOWN_REQUEST) {
                    LoggerUtils.info(this.logger, this.envImpl, "Receive shutdown request from feeder " + this.config.getFeederHost() + ", shutdown subscriber");
                    BaseProtocol.ShutdownRequest req = (BaseProtocol.ShutdownRequest)message;
                    GroupShutdownException exp = new GroupShutdownException(this.logger, repImpl, this.config.getFeederHost(), this.stats.getHighVLSN(), req.getShutdownTimeMs());
                    this.offer(exp);
                    throw exp;
                }
                this.offer(message);
                long pending = this.inputQueue.size();
                if (pending <= this.stats.getMaxPendingInput().get()) continue;
                this.stats.getMaxPendingInput().set(pending);
                LoggerUtils.finest(this.logger, this.envImpl, "Max pending request log items:" + pending);
            }
        }
        catch (GroupShutdownException | ReplicationSecurityException exp) {
            throw exp;
        }
        catch (Exception e) {
            throw new InternalException(e.getMessage(), e);
        }
    }

    private void checkOutputThread() throws InternalException, ReplicationSecurityException {
        SubscriptionOutputThread cachedOutputThread = this.outputThread;
        if (cachedOutputThread == null) {
            LoggerUtils.fine(this.logger, this.envImpl, "output thread no longer exists");
            return;
        }
        if (cachedOutputThread.getException() instanceof ReplicationSecurityException) {
            ReplicationSecurityException rse = (ReplicationSecurityException)cachedOutputThread.getException();
            LoggerUtils.warning(this.logger, this.envImpl, "Output thread exited due to security check failure: " + rse.getMessage());
            throw rse;
        }
    }

    private void queueAck(Long xid) throws IOException {
        try {
            this.outputQueue.put(xid);
        }
        catch (InterruptedException ie) {
            throw new IOException("Ack I/O interrupted", ie);
        }
    }

    private class SubscriptionThreadExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        private SubscriptionThreadExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            SubscriptionThread.this.logger.severe("Error { " + e.getMessage() + " } in SubscriptionThread {" + t + " } was uncaught.\nstack trace:\n" + LoggerUtils.getStackTrace(e));
        }
    }

    private class ConnectionException
    extends RuntimeException {
        private final long retrySleepMs;

        ConnectionException(String message, long retrySleepMs, Throwable cause) {
            super(message, cause);
            this.retrySleepMs = retrySleepMs;
        }

        long getRetrySleepMs() {
            return this.retrySleepMs;
        }

        @Override
        public String getMessage() {
            return "Failed to connect, will retry after sleeping " + this.retrySleepMs + " ms";
        }
    }

    private class SubFeederHandshakeConfig
    implements ReplicaFeederHandshakeConfig {
        private final NodeType nodeType;
        private final RepImpl repImpl;

        SubFeederHandshakeConfig(NodeType nodeType) {
            this.nodeType = nodeType;
            this.repImpl = (RepImpl)SubscriptionThread.this.envImpl;
        }

        @Override
        public RepImpl getRepImpl() {
            return this.repImpl;
        }

        @Override
        public NameIdPair getNameIdPair() {
            return this.getRepImpl().getNameIdPair();
        }

        @Override
        public RepUtils.Clock getClock() {
            return new RepUtils.Clock(RepImpl.getClockSkewMs());
        }

        @Override
        public NodeType getNodeType() {
            return this.nodeType;
        }

        @Override
        public NamedChannel getNamedChannel() {
            return SubscriptionThread.this.namedChannel;
        }

        @Override
        public RepGroupImpl getGroup() {
            RepGroupImpl repGroupImpl = new RepGroupImpl(SubscriptionThread.this.config.getGroupName(), true, this.repImpl.getCurrentJEVersion());
            if (SubscriptionThread.this.config.getGroupUUID() != null) {
                repGroupImpl.setUUID(SubscriptionThread.this.config.getGroupUUID());
            }
            return repGroupImpl;
        }
    }
}

