/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestEditLogRace {
    private static final Log LOG;
    private static final String NAME_DIR;
    static final int NUM_THREADS = 16;
    static final int NUM_ROLLS = 30;
    static final int NUM_SAVE_IMAGE = 30;
    private final List<Transactions> workers = new ArrayList<Transactions>();
    private static final int NUM_DATA_NODES = 1;
    private static final int BLOCK_TIME = 10;

    private void startTransactionWorkers(NamenodeProtocols namesystem, AtomicReference<Throwable> caughtErr) {
        for (int i = 0; i < 16; ++i) {
            Transactions trans = new Transactions(namesystem, caughtErr);
            new Thread((Runnable)trans, "TransactionThread-" + i).start();
            this.workers.add(trans);
        }
    }

    private void stopTransactionWorkers() {
        for (Transactions worker : this.workers) {
            worker.stop();
        }
        for (Transactions worker : this.workers) {
            Thread thr = worker.getThread();
            try {
                if (thr == null) continue;
                thr.join();
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEditLogRolling() throws Exception {
        DistributedFileSystem fileSys;
        MiniDFSCluster cluster;
        block10: {
            HdfsConfiguration conf = new HdfsConfiguration();
            cluster = null;
            fileSys = null;
            AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
            try {
                cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).build();
                cluster.waitActive();
                fileSys = cluster.getFileSystem();
                NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
                FSImage fsimage = cluster.getNamesystem().getFSImage();
                Storage.StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
                this.startTransactionWorkers(nn, caughtErr);
                long previousLogTxId = 1L;
                for (int i = 0; i < 30 && caughtErr.get() == null; ++i) {
                    try {
                        Thread.sleep(20L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    LOG.info((Object)("Starting roll " + i + "."));
                    CheckpointSignature sig = nn.rollEditLog();
                    long nextLog = sig.curSegmentTxId;
                    String logFileName = NNStorage.getFinalizedEditsFileName((long)previousLogTxId, (long)(nextLog - 1L));
                    previousLogTxId += this.verifyEditLogs(cluster.getNamesystem(), fsimage, logFileName, previousLogTxId);
                    Assert.assertEquals((long)previousLogTxId, (long)nextLog);
                    File expectedLog = NNStorage.getInProgressEditsFile((Storage.StorageDirectory)sd, (long)previousLogTxId);
                    Assert.assertTrue((String)("Expect " + expectedLog + " to exist"), (boolean)expectedLog.exists());
                }
                this.stopTransactionWorkers();
                if (caughtErr.get() == null) break block10;
            }
            catch (Throwable throwable) {
                this.stopTransactionWorkers();
                if (caughtErr.get() != null) {
                    throw new RuntimeException((Throwable)caughtErr.get());
                }
                if (fileSys != null) {
                    fileSys.close();
                }
                if (cluster != null) {
                    cluster.shutdown();
                }
                throw throwable;
            }
            throw new RuntimeException(caughtErr.get());
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, String logFileName, long startTxId) throws IOException {
        long numEdits = -1L;
        for (Storage.StorageDirectory sd : fsimage.getStorage().dirIterable((Storage.StorageDirType)NNStorage.NameNodeDirType.EDITS)) {
            File editFile = new File(sd.getCurrentDir(), logFileName);
            System.out.println("Verifying file: " + editFile);
            FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
            long numEditsThisLog = loader.loadFSEdits((EditLogInputStream)new EditLogFileInputStream(editFile), startTxId);
            System.out.println("Number of edits: " + numEditsThisLog);
            Assert.assertTrue((numEdits == -1L || numEditsThisLog == numEdits ? 1 : 0) != 0);
            numEdits = numEditsThisLog;
        }
        Assert.assertTrue((numEdits != -1L ? 1 : 0) != 0);
        return numEdits;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSaveNamespace() throws Exception {
        DistributedFileSystem fileSys;
        MiniDFSCluster cluster;
        block10: {
            HdfsConfiguration conf = new HdfsConfiguration();
            cluster = null;
            fileSys = null;
            AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
            try {
                cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).build();
                cluster.waitActive();
                fileSys = cluster.getFileSystem();
                FSNamesystem namesystem = cluster.getNamesystem();
                NamenodeProtocols nn = cluster.getNameNodeRpc();
                FSImage fsimage = namesystem.getFSImage();
                FSEditLog editLog = fsimage.getEditLog();
                this.startTransactionWorkers(nn, caughtErr);
                for (int i = 0; i < 30 && caughtErr.get() == null; ++i) {
                    try {
                        Thread.sleep(20L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    LOG.info((Object)("Save " + i + ": entering safe mode"));
                    namesystem.enterSafeMode(false);
                    long logStartTxId = fsimage.getStorage().getMostRecentCheckpointTxId() + 1L;
                    this.verifyEditLogs(namesystem, fsimage, NNStorage.getInProgressEditsFileName((long)logStartTxId), logStartTxId);
                    LOG.info((Object)("Save " + i + ": saving namespace"));
                    namesystem.saveNamespace();
                    LOG.info((Object)("Save " + i + ": leaving safemode"));
                    long savedImageTxId = fsimage.getStorage().getMostRecentCheckpointTxId();
                    this.verifyEditLogs(namesystem, fsimage, NNStorage.getFinalizedEditsFileName((long)logStartTxId, (long)savedImageTxId), logStartTxId);
                    Assert.assertEquals((long)fsimage.getStorage().getMostRecentCheckpointTxId(), (long)(editLog.getLastWrittenTxId() - 1L));
                    namesystem.leaveSafeMode();
                    LOG.info((Object)("Save " + i + ": complete"));
                }
                this.stopTransactionWorkers();
                if (caughtErr.get() == null) break block10;
            }
            catch (Throwable throwable) {
                this.stopTransactionWorkers();
                if (caughtErr.get() != null) {
                    throw new RuntimeException((Throwable)caughtErr.get());
                }
                if (fileSys != null) {
                    fileSys.close();
                }
                if (cluster != null) {
                    cluster.shutdown();
                }
                throw throwable;
            }
            throw new RuntimeException(caughtErr.get());
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private Configuration getConf() {
        HdfsConfiguration conf = new HdfsConfiguration();
        FileSystem.setDefaultUri((Configuration)conf, (String)"hdfs://localhost:0");
        conf.set("dfs.namenode.http-address", "0.0.0.0:0");
        conf.set("dfs.namenode.name.dir", NAME_DIR);
        conf.set("dfs.namenode.edits.dir", NAME_DIR);
        conf.setBoolean("dfs.permissions.enabled", false);
        return conf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSaveImageWhileSyncInProgress() throws Exception {
        Configuration conf = this.getConf();
        NameNode.initMetrics((Configuration)conf, (HdfsServerConstants.NamenodeRole)HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem namesystem = FSNamesystem.loadFromDisk((Configuration)conf);
        try {
            FSImage fsimage = namesystem.getFSImage();
            FSEditLog editLog = fsimage.getEditLog();
            JournalSet.JournalAndStream jas = (JournalSet.JournalAndStream)editLog.getJournals().get(0);
            EditLogFileOutputStream spyElos = (EditLogFileOutputStream)Mockito.spy((Object)((EditLogFileOutputStream)jas.getCurrentStream()));
            jas.setCurrentStreamForTests((EditLogOutputStream)spyElos);
            final AtomicReference deferredException = new AtomicReference();
            final CountDownLatch waitToEnterFlush = new CountDownLatch(1);
            final Thread doAnEditThread = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info((Object)"Starting mkdirs");
                        namesystem.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission(493)), true);
                        LOG.info((Object)"mkdirs complete");
                    }
                    catch (Throwable ioe) {
                        LOG.fatal((Object)"Got exception", ioe);
                        deferredException.set(ioe);
                        waitToEnterFlush.countDown();
                    }
                }
            };
            Answer<Void> blockingFlush = new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    LOG.info((Object)"Flush called");
                    if (Thread.currentThread() == doAnEditThread) {
                        LOG.info((Object)"edit thread: Telling main thread we made it to flush section...");
                        waitToEnterFlush.countDown();
                        LOG.info((Object)"edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        LOG.info((Object)"Going through to flush. This will allow the main thread to continue.");
                    }
                    invocation.callRealMethod();
                    LOG.info((Object)"Flush complete");
                    return null;
                }
            };
            ((EditLogFileOutputStream)Mockito.doAnswer((Answer)blockingFlush).when((Object)spyElos)).flush();
            doAnEditThread.start();
            LOG.info((Object)"Main thread: waiting to enter flush...");
            waitToEnterFlush.await();
            Assert.assertNull(deferredException.get());
            LOG.info((Object)"Main thread: detected that logSync is in unsynchronized section.");
            LOG.info((Object)"Trying to enter safe mode.");
            LOG.info((Object)"This should block for 10sec, since flush will sleep that long");
            long st = Time.now();
            namesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long et = Time.now();
            LOG.info((Object)"Entered safe mode");
            Assert.assertTrue((et - st > 9000L ? 1 : 0) != 0);
            namesystem.saveNamespace();
            LOG.info((Object)"Joining on edit thread...");
            doAnEditThread.join();
            Assert.assertNull(deferredException.get());
            Assert.assertEquals((long)3L, (long)this.verifyEditLogs(namesystem, fsimage, NNStorage.getFinalizedEditsFileName((long)1L, (long)3L), 1L));
            Assert.assertEquals((long)1L, (long)this.verifyEditLogs(namesystem, fsimage, NNStorage.getInProgressEditsFileName((long)4L), 4L));
        }
        finally {
            LOG.info((Object)"Closing nn");
            if (namesystem != null) {
                namesystem.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSaveRightBeforeSync() throws Exception {
        Configuration conf = this.getConf();
        NameNode.initMetrics((Configuration)conf, (HdfsServerConstants.NamenodeRole)HdfsServerConstants.NamenodeRole.NAMENODE);
        DFSTestUtil.formatNameNode(conf);
        final FSNamesystem namesystem = FSNamesystem.loadFromDisk((Configuration)conf);
        try {
            FSImage fsimage = namesystem.getFSImage();
            FSEditLog editLog = (FSEditLog)Mockito.spy((Object)fsimage.getEditLog());
            DFSTestUtil.setEditLogForTesting(namesystem, editLog);
            final AtomicReference deferredException = new AtomicReference();
            final CountDownLatch waitToEnterSync = new CountDownLatch(1);
            final Thread doAnEditThread = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info((Object)"Starting mkdirs");
                        namesystem.mkdirs("/test", new PermissionStatus("test", "test", new FsPermission(493)), true);
                        LOG.info((Object)"mkdirs complete");
                    }
                    catch (Throwable ioe) {
                        LOG.fatal((Object)"Got exception", ioe);
                        deferredException.set(ioe);
                        waitToEnterSync.countDown();
                    }
                }
            };
            Answer<Void> blockingSync = new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    LOG.info((Object)"logSync called");
                    if (Thread.currentThread() == doAnEditThread) {
                        LOG.info((Object)"edit thread: Telling main thread we made it just before logSync...");
                        waitToEnterSync.countDown();
                        LOG.info((Object)"edit thread: sleeping for 10secs");
                        Thread.sleep(10000L);
                        LOG.info((Object)"Going through to logSync. This will allow the main thread to continue.");
                    }
                    invocation.callRealMethod();
                    LOG.info((Object)"logSync complete");
                    return null;
                }
            };
            ((FSEditLog)Mockito.doAnswer((Answer)blockingSync).when((Object)editLog)).logSync();
            doAnEditThread.start();
            LOG.info((Object)"Main thread: waiting to just before logSync...");
            waitToEnterSync.await();
            Assert.assertNull(deferredException.get());
            LOG.info((Object)"Main thread: detected that logSync about to be called.");
            LOG.info((Object)"Trying to enter safe mode.");
            LOG.info((Object)"This should block for 10sec, since we have pending edits");
            long st = Time.now();
            namesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
            long et = Time.now();
            LOG.info((Object)"Entered safe mode");
            Assert.assertTrue((et - st > 9000L ? 1 : 0) != 0);
            namesystem.saveNamespace();
            LOG.info((Object)"Joining on edit thread...");
            doAnEditThread.join();
            Assert.assertNull(deferredException.get());
            Assert.assertEquals((long)3L, (long)this.verifyEditLogs(namesystem, fsimage, NNStorage.getFinalizedEditsFileName((long)1L, (long)3L), 1L));
            Assert.assertEquals((long)1L, (long)this.verifyEditLogs(namesystem, fsimage, NNStorage.getInProgressEditsFileName((long)4L), 4L));
        }
        finally {
            LOG.info((Object)"Closing nn");
            if (namesystem != null) {
                namesystem.close();
            }
        }
    }

    static {
        ((Log4JLogger)FSEditLog.LOG).getLogger().setLevel(Level.ALL);
        LOG = LogFactory.getLog(TestEditLogRace.class);
        NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name1";
    }

    static class Transactions
    implements Runnable {
        final NamenodeProtocols nn;
        short replication = (short)3;
        long blockSize = 64L;
        volatile boolean stopped = false;
        volatile Thread thr;
        final AtomicReference<Throwable> caught;

        Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
            this.nn = ns;
            this.caught = caught;
        }

        @Override
        public void run() {
            this.thr = Thread.currentThread();
            FsPermission p = new FsPermission(511);
            int i = 0;
            while (!this.stopped) {
                try {
                    String dirname = "/thr-" + this.thr.getId() + "-dir-" + i;
                    this.nn.mkdirs(dirname, p, true);
                    this.nn.delete(dirname, true);
                }
                catch (SafeModeException dirname) {
                }
                catch (Throwable e) {
                    LOG.warn((Object)"Got error in transaction thread", e);
                    this.caught.compareAndSet(null, e);
                    break;
                }
                ++i;
            }
        }

        public void stop() {
            this.stopped = true;
        }

        public Thread getThread() {
            return this.thr;
        }
    }
}

