/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.qjournal.client;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannelMetrics;
import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncaughtExceptionHandlers;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
public class IPCLoggerChannel
implements AsyncLogger {
    private final Configuration conf;
    protected final InetSocketAddress addr;
    private QJournalProtocol proxy;
    private final ListeningExecutorService singleThreadExecutor;
    private final ListeningExecutorService parallelExecutor;
    private long ipcSerial = 0L;
    private long epoch = -1L;
    private long committedTxId = -12345L;
    private final String journalId;
    private final String nameServiceId;
    private final NamespaceInfo nsInfo;
    private URL httpServerURL;
    private final IPCLoggerChannelMetrics metrics;
    private int queuedEditsSizeBytes = 0;
    private long highestAckedTxId = 0L;
    private long lastAckNanos = 0L;
    private long lastCommitNanos = 0L;
    private final int queueSizeLimitBytes;
    private boolean outOfSync = false;
    private final StopWatch lastHeartbeatStopwatch = new StopWatch();
    private static final long HEARTBEAT_INTERVAL_MILLIS = 1000L;
    private static final long WARN_JOURNAL_MILLIS_THRESHOLD = 1000L;
    static final AsyncLogger.Factory FACTORY = IPCLoggerChannel::new;

    public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) {
        this(conf, nsInfo, journalId, null, addr);
    }

    public IPCLoggerChannel(Configuration conf, NamespaceInfo nsInfo, String journalId, String nameServiceId, InetSocketAddress addr) {
        this.conf = conf;
        this.nsInfo = nsInfo;
        this.journalId = journalId;
        this.nameServiceId = nameServiceId;
        this.addr = addr;
        this.queueSizeLimitBytes = 0x100000 * conf.getInt("dfs.qjournal.queued-edits.limit.mb", 10);
        this.singleThreadExecutor = MoreExecutors.listeningDecorator((ExecutorService)this.createSingleThreadExecutor());
        this.parallelExecutor = MoreExecutors.listeningDecorator((ExecutorService)this.createParallelExecutor());
        this.metrics = IPCLoggerChannelMetrics.create(this);
    }

    @Override
    public synchronized void setEpoch(long epoch) {
        this.epoch = epoch;
    }

    @Override
    public synchronized void setCommittedTxId(long txid) {
        Preconditions.checkArgument((txid >= this.committedTxId ? 1 : 0) != 0, (String)"Trying to move committed txid backwards in client old: %s new: %s", (Object[])new Object[]{this.committedTxId, txid});
        this.committedTxId = txid;
        this.lastCommitNanos = Time.monotonicNowNanos();
    }

    @Override
    public void close() {
        this.singleThreadExecutor.shutdown();
        this.parallelExecutor.shutdown();
        if (this.proxy != null) {
            RPC.stopProxy((Object)this.proxy);
        }
    }

    protected QJournalProtocol getProxy() throws IOException {
        if (this.proxy != null) {
            return this.proxy;
        }
        this.proxy = this.createProxy();
        return this.proxy;
    }

    protected QJournalProtocol createProxy() throws IOException {
        Configuration confCopy = new Configuration(this.conf);
        confCopy.setBoolean("ipc.client.tcpnodelay", true);
        RPC.setProtocolEngine((Configuration)confCopy, QJournalProtocolPB.class, ProtobufRpcEngine2.class);
        return (QJournalProtocol)SecurityUtil.doAsLoginUser(() -> {
            RPC.setProtocolEngine((Configuration)confCopy, QJournalProtocolPB.class, ProtobufRpcEngine2.class);
            QJournalProtocolPB pbproxy = (QJournalProtocolPB)RPC.getProxy(QJournalProtocolPB.class, (long)RPC.getProtocolVersion(QJournalProtocolPB.class), (InetSocketAddress)this.addr, (Configuration)confCopy);
            return new QJournalProtocolTranslatorPB(pbproxy);
        });
    }

    @VisibleForTesting
    protected ExecutorService createSingleThreadExecutor() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Logger channel (from single-thread executor) to " + this.addr).setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit()).build());
    }

    @VisibleForTesting
    protected ExecutorService createParallelExecutor() {
        int numThreads = this.conf.getInt("dfs.qjournal.parallel-read.num-threads", 5);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(numThreads, numThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Logger channel (from parallel executor) to " + this.addr).setUncaughtExceptionHandler(UncaughtExceptionHandlers.systemExit()).build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    @Override
    public URL buildURLToFetchLogs(long segmentTxId) {
        Preconditions.checkArgument((segmentTxId > 0L ? 1 : 0) != 0, (String)"Invalid segment: %s", (Object[])new Object[]{segmentTxId});
        Preconditions.checkState((boolean)this.hasHttpServerEndPoint(), (Object)"No HTTP/HTTPS endpoint");
        try {
            String path = GetJournalEditServlet.buildPath(this.journalId, segmentTxId, this.nsInfo, true);
            return new URL(this.httpServerURL, path);
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    private synchronized RequestInfo createReqInfo() {
        Preconditions.checkState((this.epoch > 0L ? 1 : 0) != 0, (Object)("bad epoch: " + this.epoch));
        return new RequestInfo(this.journalId, this.nameServiceId, this.epoch, this.ipcSerial++, this.committedTxId);
    }

    public synchronized int getQueuedEditsSize() {
        return this.queuedEditsSizeBytes;
    }

    public InetSocketAddress getRemoteAddress() {
        return this.addr;
    }

    public synchronized boolean isOutOfSync() {
        return this.outOfSync;
    }

    @VisibleForTesting
    void waitForAllPendingCalls() throws InterruptedException {
        try {
            this.singleThreadExecutor.submit(() -> {}).get();
        }
        catch (ExecutionException e) {
            throw new AssertionError((Object)e);
        }
    }

    @Override
    public ListenableFuture<Boolean> isFormatted() {
        return this.singleThreadExecutor.submit(() -> this.getProxy().isFormatted(this.journalId, this.nameServiceId));
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.GetJournalStateResponseProto> getJournalState() {
        return this.singleThreadExecutor.submit(() -> {
            QJournalProtocolProtos.GetJournalStateResponseProto ret = this.getProxy().getJournalState(this.journalId, this.nameServiceId);
            this.constructHttpServerURI(ret);
            return ret;
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.NewEpochResponseProto> newEpoch(long epoch) {
        return this.singleThreadExecutor.submit(() -> this.getProxy().newEpoch(this.journalId, this.nameServiceId, this.nsInfo, epoch));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<Void> sendEdits(long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
        ListenableFuture ret;
        block6: {
            try {
                this.reserveQueueSpace(data.length);
            }
            catch (LoggerTooFarBehindException e) {
                return Futures.immediateFailedFuture((Throwable)e);
            }
            long submitNanos = Time.monotonicNowNanos();
            ret = null;
            try {
                ret = this.singleThreadExecutor.submit(() -> {
                    this.throwIfOutOfSync();
                    long rpcSendTimeNanos = Time.monotonicNowNanos();
                    try {
                        this.getProxy().journal(this.createReqInfo(), segmentTxId, firstTxnId, numTxns, data);
                    }
                    catch (IOException e) {
                        try {
                            QuorumJournalManager.LOG.warn("Remote journal {} failed to write txns {}-{}. Will try to write to this JN again after the next log roll.", new Object[]{this, firstTxnId, firstTxnId + (long)numTxns - 1L, e});
                            IPCLoggerChannel iPCLoggerChannel = this;
                            synchronized (iPCLoggerChannel) {
                                this.outOfSync = true;
                            }
                            throw e;
                        }
                        catch (Throwable throwable) {
                            long nowNanos = Time.monotonicNowNanos();
                            long rpcTimeMicros = TimeUnit.MICROSECONDS.convert(nowNanos - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
                            long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert(nowNanos - submitNanos, TimeUnit.NANOSECONDS);
                            this.metrics.addWriteEndToEndLatency(endToEndTimeMicros);
                            this.metrics.addWriteRpcLatency(rpcTimeMicros);
                            if (rpcTimeMicros / 1000L > 1000L) {
                                QuorumJournalManager.LOG.warn("Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.", new Object[]{rpcTimeMicros / 1000L, numTxns, data.length, this});
                            }
                            throw throwable;
                        }
                    }
                    long nowNanos = Time.monotonicNowNanos();
                    long rpcTimeMicros = TimeUnit.MICROSECONDS.convert(nowNanos - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
                    long endToEndTimeMicros = TimeUnit.MICROSECONDS.convert(nowNanos - submitNanos, TimeUnit.NANOSECONDS);
                    this.metrics.addWriteEndToEndLatency(endToEndTimeMicros);
                    this.metrics.addWriteRpcLatency(rpcTimeMicros);
                    if (rpcTimeMicros / 1000L > 1000L) {
                        QuorumJournalManager.LOG.warn("Took {}ms to send a batch of {} edits ({} bytes) to remote journal {}.", new Object[]{rpcTimeMicros / 1000L, numTxns, data.length, this});
                    }
                    IPCLoggerChannel iPCLoggerChannel = this;
                    synchronized (iPCLoggerChannel) {
                        this.highestAckedTxId = firstTxnId + (long)numTxns - 1L;
                        this.lastAckNanos = submitNanos;
                    }
                    return null;
                });
                if (ret != null) break block6;
                this.unreserveQueueSpace(data.length);
            }
            catch (Throwable throwable) {
                if (ret == null) {
                    this.unreserveQueueSpace(data.length);
                } else {
                    Futures.addCallback((ListenableFuture)ret, (FutureCallback)new FutureCallback<Void>(data){
                        final /* synthetic */ byte[] val$data;
                        {
                            this.val$data = byArray;
                        }

                        public void onFailure(Throwable t) {
                            IPCLoggerChannel.this.unreserveQueueSpace(this.val$data.length);
                        }

                        public void onSuccess(Void t) {
                            IPCLoggerChannel.this.unreserveQueueSpace(this.val$data.length);
                        }
                    }, (Executor)MoreExecutors.directExecutor());
                }
                throw throwable;
            }
        }
        Futures.addCallback((ListenableFuture)ret, (FutureCallback)new /* invalid duplicate definition of identical inner class */, (Executor)MoreExecutors.directExecutor());
        return ret;
    }

    private void throwIfOutOfSync() throws IOException {
        if (this.isOutOfSync()) {
            this.heartbeatIfNecessary();
            throw new JournalOutOfSyncException("Journal disabled until next roll");
        }
    }

    private void heartbeatIfNecessary() throws IOException {
        if (this.lastHeartbeatStopwatch.now(TimeUnit.MILLISECONDS) > 1000L || !this.lastHeartbeatStopwatch.isRunning()) {
            try {
                this.getProxy().heartbeat(this.createReqInfo());
            }
            finally {
                this.lastHeartbeatStopwatch.reset().start();
            }
        }
    }

    private synchronized void reserveQueueSpace(int size) throws LoggerTooFarBehindException {
        Preconditions.checkArgument((size >= 0 ? 1 : 0) != 0);
        if (this.queuedEditsSizeBytes + size > this.queueSizeLimitBytes && this.queuedEditsSizeBytes > 0) {
            QuorumJournalManager.LOG.warn("Pending edits to {} is going to exceed limit size: {}, current queued edits size: {}, will silently drop {} bytes of edits!", new Object[]{IPCLoggerChannel.class, this.queueSizeLimitBytes, this.queuedEditsSizeBytes, size});
            throw new LoggerTooFarBehindException();
        }
        this.queuedEditsSizeBytes += size;
    }

    private synchronized void unreserveQueueSpace(int size) {
        Preconditions.checkArgument((size >= 0 ? 1 : 0) != 0);
        this.queuedEditsSizeBytes -= size;
    }

    @Override
    public ListenableFuture<Void> format(NamespaceInfo nsInfo, boolean force) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().format(this.journalId, this.nameServiceId, nsInfo, force);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> startLogSegment(long txid, int layoutVersion) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().startLogSegment(this.createReqInfo(), txid, layoutVersion);
            IPCLoggerChannel iPCLoggerChannel = this;
            synchronized (iPCLoggerChannel) {
                if (this.outOfSync) {
                    this.outOfSync = false;
                    QuorumJournalManager.LOG.info("Restarting previously-stopped writes to {} in segment starting at txid {}.", IPCLoggerChannel.class, (Object)txid);
                }
            }
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> finalizeLogSegment(long startTxId, long endTxId) {
        return this.singleThreadExecutor.submit(() -> {
            this.throwIfOutOfSync();
            this.getProxy().finalizeLogSegment(this.createReqInfo(), startTxId, endTxId);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().purgeLogsOlderThan(this.createReqInfo(), minTxIdToKeep);
            return null;
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.GetJournaledEditsResponseProto> getJournaledEdits(long fromTxnId, int maxTransactions) {
        return this.parallelExecutor.submit(() -> this.getProxy().getJournaledEdits(this.journalId, this.nameServiceId, fromTxnId, maxTransactions));
    }

    @Override
    public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(long fromTxnId, boolean inProgressOk) {
        return this.parallelExecutor.submit(() -> {
            QJournalProtocolProtos.GetEditLogManifestResponseProto ret = this.getProxy().getEditLogManifest(this.journalId, this.nameServiceId, fromTxnId, inProgressOk);
            this.constructHttpServerURI(ret);
            return PBHelper.convert(ret.getManifest());
        });
    }

    @Override
    public ListenableFuture<QJournalProtocolProtos.PrepareRecoveryResponseProto> prepareRecovery(long segmentTxId) {
        return this.singleThreadExecutor.submit(() -> {
            if (!this.hasHttpServerEndPoint()) {
                QJournalProtocolProtos.GetJournalStateResponseProto ret = this.getProxy().getJournalState(this.journalId, this.nameServiceId);
                this.constructHttpServerURI(ret);
            }
            return this.getProxy().prepareRecovery(this.createReqInfo(), segmentTxId);
        });
    }

    @Override
    public ListenableFuture<Void> acceptRecovery(QJournalProtocolProtos.SegmentStateProto log, URL url) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().acceptRecovery(this.createReqInfo(), log, url);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> doPreUpgrade() {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().doPreUpgrade(this.journalId);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> doUpgrade(StorageInfo sInfo) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().doUpgrade(this.journalId, sInfo);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> doFinalize() {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().doFinalize(this.journalId, this.nameServiceId);
            return null;
        });
    }

    @Override
    public ListenableFuture<Boolean> canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) {
        return this.singleThreadExecutor.submit(() -> this.getProxy().canRollBack(this.journalId, this.nameServiceId, storage, prevStorage, targetLayoutVersion));
    }

    @Override
    public ListenableFuture<Void> doRollback() {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().doRollback(this.journalId, this.nameServiceId);
            return null;
        });
    }

    @Override
    public ListenableFuture<Void> discardSegments(long startTxId) {
        return this.singleThreadExecutor.submit(() -> {
            this.getProxy().discardSegments(this.journalId, this.nameServiceId, startTxId);
            return null;
        });
    }

    @Override
    public ListenableFuture<Long> getJournalCTime() {
        return this.singleThreadExecutor.submit(() -> this.getProxy().getJournalCTime(this.journalId, this.nameServiceId));
    }

    public String toString() {
        return InetAddresses.toAddrString((InetAddress)this.addr.getAddress()) + ':' + this.addr.getPort();
    }

    @Override
    public synchronized void appendReport(StringBuilder sb) {
        sb.append("Written txid ").append(this.highestAckedTxId);
        long behind = this.getLagTxns();
        if (behind > 0L) {
            if (this.lastAckNanos != 0L) {
                long lagMillis = this.getLagTimeMillis();
                sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
            } else {
                sb.append(" (never written");
            }
        }
        if (this.outOfSync) {
            sb.append(" (will try to re-sync on next segment)");
        }
    }

    public synchronized long getLagTxns() {
        return Math.max(this.committedTxId - this.highestAckedTxId, 0L);
    }

    public synchronized long getLagTimeMillis() {
        return TimeUnit.MILLISECONDS.convert(Math.max(this.lastCommitNanos - this.lastAckNanos, 0L), TimeUnit.NANOSECONDS);
    }

    private void constructHttpServerURI(QJournalProtocolProtos.GetEditLogManifestResponseProto ret) {
        if (ret.hasFromURL()) {
            URI uri = URI.create(ret.getFromURL());
            this.httpServerURL = this.getHttpServerURI(uri.getScheme(), uri.getPort());
        } else {
            this.httpServerURL = this.getHttpServerURI("http", ret.getHttpPort());
        }
    }

    private void constructHttpServerURI(QJournalProtocolProtos.GetJournalStateResponseProto ret) {
        if (ret.hasFromURL()) {
            URI uri = URI.create(ret.getFromURL());
            this.httpServerURL = this.getHttpServerURI(uri.getScheme(), uri.getPort());
        } else {
            this.httpServerURL = this.getHttpServerURI("http", ret.getHttpPort());
        }
    }

    private URL getHttpServerURI(String scheme, int port) {
        try {
            return new URL(scheme, this.addr.getHostName(), port, "");
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean hasHttpServerEndPoint() {
        return this.httpServerURL != null;
    }
}

