/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
public class StandbyCheckpointer {
    private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
    private static final long PREVENT_AFTER_CANCEL_MS = 120000L;
    private final CheckpointConf checkpointConf;
    private final Configuration conf;
    private final FSNamesystem namesystem;
    private long lastCheckpointTime;
    private long lastUploadTime;
    private final CheckpointerThread thread;
    private final ThreadFactory uploadThreadFactory;
    private List<URL> activeNNAddresses;
    private URL myNNAddress;
    private final Object cancelLock = new Object();
    private Canceler canceler;
    private boolean isPrimaryCheckPointer = true;
    private static int canceledCount = 0;

    public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException {
        this.namesystem = ns;
        this.conf = conf;
        this.checkpointConf = new CheckpointConf(conf);
        this.thread = new CheckpointerThread();
        this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TransferFsImageUpload-%d").build();
        this.setNameNodeAddresses(conf);
    }

    private void setNameNodeAddresses(Configuration conf) throws IOException {
        this.myNNAddress = this.getHttpAddress(conf);
        List<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);
        this.activeNNAddresses = new ArrayList<URL>(confForActive.size());
        for (Configuration activeConf : confForActive) {
            URL activeNNAddress = this.getHttpAddress(activeConf);
            Preconditions.checkArgument((boolean)StandbyCheckpointer.checkAddress(activeNNAddress), (String)"Bad address for active NN: %s", (Object[])new Object[]{activeNNAddress});
            this.activeNNAddresses.add(activeNNAddress);
        }
        Preconditions.checkArgument((boolean)StandbyCheckpointer.checkAddress(this.myNNAddress), (String)"Bad address for standby NN: %s", (Object[])new Object[]{this.myNNAddress});
    }

    private URL getHttpAddress(Configuration conf) throws IOException {
        String scheme = DFSUtil.getHttpClientScheme(conf);
        String defaultHost = NameNode.getServiceAddress(conf, true).getHostName();
        URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme);
        return addr.toURL();
    }

    private static boolean checkAddress(URL addr) {
        return addr.getPort() != 0;
    }

    public void start() {
        LOG.info((Object)("Starting standby checkpoint thread...\nCheckpointing active NN to possible NNs: " + this.activeNNAddresses + "\nServing checkpoints at " + this.myNNAddress));
        this.thread.start();
    }

    public void stop() throws IOException {
        this.cancelAndPreventCheckpoints("Stopping checkpointer");
        this.thread.setShouldRun(false);
        this.thread.interrupt();
        try {
            this.thread.join();
        }
        catch (InterruptedException e) {
            LOG.warn((Object)"Edit log tailer thread exited with an exception");
            throw new IOException(e);
        }
    }

    public void triggerRollbackCheckpoint() {
        this.thread.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
        Future upload;
        int i;
        long txid;
        NNStorage.NameNodeFile imageType;
        assert (this.canceler != null);
        this.namesystem.cpLockInterruptibly();
        try {
            assert (this.namesystem.getEditLog().isOpenForRead()) : "Standby Checkpointer should only attempt a checkpoint when NN is in standby mode, but the edit logs are in an unexpected state";
            FSImage img = this.namesystem.getFSImage();
            long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
            long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
            assert (thisCheckpointTxId >= prevCheckpointTxId);
            if (thisCheckpointTxId == prevCheckpointTxId) {
                LOG.info((Object)("A checkpoint was triggered but the Standby Node has not received any transactions since the last checkpoint at txid " + thisCheckpointTxId + ". Skipping..."));
                return;
            }
            imageType = this.namesystem.isRollingUpgrade() && !this.namesystem.getFSImage().hasRollbackFSImage() ? NNStorage.NameNodeFile.IMAGE_ROLLBACK : NNStorage.NameNodeFile.IMAGE;
            img.saveNamespace(this.namesystem, imageType, this.canceler);
            txid = img.getStorage().getMostRecentCheckpointTxId();
            assert (txid == thisCheckpointTxId) : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid;
            String outputDir = this.checkpointConf.getLegacyOivImageDir();
            if (outputDir != null && !outputDir.isEmpty()) {
                try {
                    img.saveLegacyOIVImage(this.namesystem, outputDir, this.canceler);
                }
                catch (IOException ioe) {
                    LOG.warn((Object)"Exception encountered while saving legacy OIV image; continuing with other checkpointing steps", (Throwable)ioe);
                }
            }
        }
        finally {
            this.namesystem.cpUnlock();
        }
        if (!sendCheckpoint) {
            return;
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, this.activeNNAddresses.size(), 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(this.activeNNAddresses.size()), this.uploadThreadFactory);
        ArrayList<Future<TransferFsImage.TransferResult>> uploads = new ArrayList<Future<TransferFsImage.TransferResult>>();
        for (final URL activeNNAddress : this.activeNNAddresses) {
            Future<TransferFsImage.TransferResult> upload2 = executor.submit(new Callable<TransferFsImage.TransferResult>(){

                @Override
                public TransferFsImage.TransferResult call() throws IOException {
                    return TransferFsImage.uploadImageFromStorage(activeNNAddress, StandbyCheckpointer.this.conf, StandbyCheckpointer.this.namesystem.getFSImage().getStorage(), imageType, txid, StandbyCheckpointer.this.canceler);
                }
            });
            uploads.add(upload2);
        }
        InterruptedException ie = null;
        IOException ioe = null;
        boolean success = false;
        for (i = 0; i < uploads.size(); ++i) {
            upload = (Future)uploads.get(i);
            try {
                if (upload.get() != TransferFsImage.TransferResult.SUCCESS) continue;
                success = true;
            }
            catch (ExecutionException e) {
                ioe = new IOException("Exception during image upload: " + e.getMessage(), e.getCause());
            }
            catch (InterruptedException e) {
                ie = e;
            }
            break;
        }
        this.lastUploadTime = Time.monotonicNow();
        this.isPrimaryCheckPointer = success;
        if (ie != null || ioe != null) {
            while (i < uploads.size()) {
                upload = (Future)uploads.get(i);
                upload.cancel(true);
                ++i;
            }
            executor.shutdownNow();
            executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            if (ie != null) {
                throw ie;
            }
            if (ioe != null) {
                throw ioe;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
        Object object = this.cancelLock;
        synchronized (object) {
            this.thread.preventCheckpointsFor(120000L);
            if (this.canceler != null) {
                this.canceler.cancel(msg);
            }
        }
    }

    @VisibleForTesting
    static int getCanceledCount() {
        return canceledCount;
    }

    private long countUncheckpointedTxns() {
        FSImage img = this.namesystem.getFSImage();
        return img.getCorrectLastAppliedOrWrittenTxId() - img.getStorage().getMostRecentCheckpointTxId();
    }

    @VisibleForTesting
    List<URL> getActiveNNAddresses() {
        return this.activeNNAddresses;
    }

    private class CheckpointerThread
    extends Thread {
        private volatile boolean shouldRun;
        private volatile long preventCheckpointsUntil;

        private CheckpointerThread() {
            super("Standby State Checkpointer");
            this.shouldRun = true;
            this.preventCheckpointsUntil = 0L;
        }

        private void setShouldRun(boolean shouldRun) {
            this.shouldRun = shouldRun;
        }

        @Override
        public void run() {
            SecurityUtil.doAsLoginUserOrFatal((PrivilegedAction)new PrivilegedAction<Object>(){

                @Override
                public Object run() {
                    CheckpointerThread.this.doWork();
                    return null;
                }
            });
        }

        private void preventCheckpointsFor(long delayMs) {
            this.preventCheckpointsUntil = Time.monotonicNow() + delayMs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doWork() {
            long checkPeriod = 1000L * StandbyCheckpointer.this.checkpointConf.getCheckPeriod();
            StandbyCheckpointer.this.lastCheckpointTime = Time.monotonicNow();
            StandbyCheckpointer.this.lastUploadTime = Time.monotonicNow();
            while (this.shouldRun) {
                boolean needRollbackCheckpoint = StandbyCheckpointer.this.namesystem.isNeedRollbackFsImage();
                if (!needRollbackCheckpoint) {
                    try {
                        Thread.sleep(checkPeriod);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if (!this.shouldRun) break;
                }
                try {
                    if (UserGroupInformation.isSecurityEnabled()) {
                        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
                    }
                    long now = Time.monotonicNow();
                    long uncheckpointed = StandbyCheckpointer.this.countUncheckpointedTxns();
                    long secsSinceLast = (now - StandbyCheckpointer.this.lastCheckpointTime) / 1000L;
                    boolean needCheckpoint = needRollbackCheckpoint;
                    if (needCheckpoint) {
                        LOG.info((Object)"Triggering a rollback fsimage for rolling upgrade.");
                    } else if (uncheckpointed >= StandbyCheckpointer.this.checkpointConf.getTxnCount()) {
                        LOG.info((Object)("Triggering checkpoint because there have been " + uncheckpointed + " txns since the last checkpoint, which exceeds the configured threshold " + StandbyCheckpointer.this.checkpointConf.getTxnCount()));
                        needCheckpoint = true;
                    } else if (secsSinceLast >= StandbyCheckpointer.this.checkpointConf.getPeriod()) {
                        LOG.info((Object)("Triggering checkpoint because it has been " + secsSinceLast + " seconds since the last checkpoint, which exceeds the configured interval " + StandbyCheckpointer.this.checkpointConf.getPeriod()));
                        needCheckpoint = true;
                    }
                    if (!needCheckpoint) continue;
                    Object object = StandbyCheckpointer.this.cancelLock;
                    synchronized (object) {
                        block44: {
                            if (now >= this.preventCheckpointsUntil) break block44;
                            LOG.info((Object)"But skipping this checkpoint since we are about to failover!");
                            canceledCount++;
                            continue;
                        }
                        assert (StandbyCheckpointer.this.canceler == null);
                        StandbyCheckpointer.this.canceler = new Canceler();
                    }
                    long secsSinceLastUpload = (now - StandbyCheckpointer.this.lastUploadTime) / 1000L;
                    boolean sendRequest = StandbyCheckpointer.this.isPrimaryCheckPointer || (double)secsSinceLastUpload >= StandbyCheckpointer.this.checkpointConf.getQuietPeriod();
                    StandbyCheckpointer.this.doCheckpoint(sendRequest);
                    if (needRollbackCheckpoint && StandbyCheckpointer.this.namesystem.getFSImage().hasRollbackFSImage()) {
                        StandbyCheckpointer.this.namesystem.setCreatedRollbackImages(true);
                        StandbyCheckpointer.this.namesystem.setNeedRollbackFsImage(false);
                    }
                    StandbyCheckpointer.this.lastCheckpointTime = now;
                }
                catch (SaveNamespaceCancelledException ce) {
                    LOG.info((Object)("Checkpoint was cancelled: " + ce.getMessage()));
                    canceledCount++;
                }
                catch (InterruptedException ie) {
                    LOG.info((Object)"Interrupted during checkpointing", (Throwable)ie);
                }
                catch (Throwable t) {
                    LOG.error((Object)"Exception in doCheckpoint", t);
                }
                finally {
                    Object ce = StandbyCheckpointer.this.cancelLock;
                    synchronized (ce) {
                        StandbyCheckpointer.this.canceler = null;
                    }
                }
            }
        }
    }
}

