/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestObserverNode {
    public static final Logger LOG = LoggerFactory.getLogger((String)TestObserverNode.class.getName());
    private static Configuration conf;
    private static MiniQJMHACluster qjmhaCluster;
    private static MiniDFSCluster dfsCluster;
    private static DistributedFileSystem dfs;
    private final Path testPath = new Path("/TestObserverNode");

    @BeforeClass
    public static void startUpCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.state.context.enabled", true);
        conf.setTimeDuration("dfs.client.failover.observer.probe.retry.period", 0L, TimeUnit.MILLISECONDS);
        qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
        dfsCluster = qjmhaCluster.getDfsCluster();
    }

    @Before
    public void setUp() throws Exception {
        TestObserverNode.setObserverRead(true);
    }

    @After
    public void cleanUp() throws IOException {
        dfs.delete(this.testPath, true);
        Assert.assertEquals((String)"NN[0] should be active", (Object)HAServiceProtocol.HAServiceState.ACTIVE, (Object)NameNodeAdapter.getServiceState(dfsCluster.getNameNode(0)));
        Assert.assertEquals((String)"NN[1] should be standby", (Object)HAServiceProtocol.HAServiceState.STANDBY, (Object)NameNodeAdapter.getServiceState(dfsCluster.getNameNode(1)));
        Assert.assertEquals((String)"NN[2] should be observer", (Object)HAServiceProtocol.HAServiceState.OBSERVER, (Object)NameNodeAdapter.getServiceState(dfsCluster.getNameNode(2)));
    }

    @AfterClass
    public static void shutDownCluster() throws IOException {
        if (qjmhaCluster != null) {
            qjmhaCluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testObserverRequeue() throws Exception {
        ScheduledExecutorService interruptor = Executors.newScheduledThreadPool(1);
        FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
        RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
        try {
            observerFsNS.getEditLogTailer().stop();
            long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();
            ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(() -> {
                Path tmpTestPath = new Path("/TestObserverRequeue");
                dfs.create(tmpTestPath, (short)1).close();
                this.assertSentTo(0);
                FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
                this.assertSentTo(2);
                return fileStatus;
            }, 0L, TimeUnit.SECONDS);
            GenericTestUtils.waitFor(() -> obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum, (long)50L, (long)10000L);
            observerFsNS.getEditLogTailer().doTailEdits();
            FileStatus fileStatus = (FileStatus)scheduledFuture.get(10000L, TimeUnit.MILLISECONDS);
            Assert.assertNotNull((Object)fileStatus);
        }
        finally {
            EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
            observerFsNS.setEditLogTailerForTests(editLogTailer);
            editLogTailer.start();
        }
    }

    @Test
    public void testNoActiveToObserver() throws Exception {
        try {
            dfsCluster.transitionToObserver(0);
        }
        catch (ServiceFailedException e) {
            return;
        }
        Assert.fail((String)"active cannot be transitioned to observer");
    }

    @Test
    public void testGetGroups() throws Exception {
        GetGroups getGroups = new GetGroups(conf);
        Assert.assertEquals((long)0L, (long)getGroups.run(new String[0]));
    }

    @Test
    public void testNoObserverToActive() throws Exception {
        try {
            dfsCluster.transitionToActive(2);
        }
        catch (ServiceFailedException e) {
            return;
        }
        Assert.fail((String)"observer cannot be transitioned to active");
    }

    @Test
    public void testSimpleRead() throws Exception {
        Path testPath2 = new Path(this.testPath, "test2");
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        dfs.mkdir(testPath2, FsPermission.getDefault());
        this.assertSentTo(0);
    }

    @Test
    public void testFailover() throws Exception {
        Path testPath2 = new Path(this.testPath, "test2");
        TestObserverNode.setObserverRead(false);
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(0);
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToActive(1);
        dfsCluster.waitActive(1);
        dfs.mkdir(testPath2, FsPermission.getDefault());
        this.assertSentTo(1);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(1);
        dfsCluster.transitionToStandby(1);
        dfsCluster.transitionToActive(0);
        dfsCluster.waitActive(0);
    }

    @Test
    public void testDoubleFailover() throws Exception {
        Path testPath2 = new Path(this.testPath, "test2");
        Path testPath3 = new Path(this.testPath, "test3");
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        dfs.mkdir(testPath2, FsPermission.getDefault());
        this.assertSentTo(0);
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToActive(1);
        dfsCluster.waitActive(1);
        dfsCluster.rollEditLogAndTail(1);
        dfs.getFileStatus(testPath2);
        this.assertSentTo(2);
        dfs.mkdir(testPath3, FsPermission.getDefault());
        this.assertSentTo(1);
        dfsCluster.transitionToStandby(1);
        dfsCluster.transitionToActive(0);
        dfsCluster.waitActive(0);
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(testPath3);
        this.assertSentTo(2);
        dfs.delete(testPath3, false);
        this.assertSentTo(0);
    }

    @Test
    public void testObserverShutdown() throws Exception {
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        dfsCluster.shutdownNameNode(2);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(0);
        dfsCluster.restartNameNode(2);
        dfsCluster.transitionToObserver(2);
        dfs.getFileStatus(this.testPath);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
    }

    @Test
    public void testObserverFailOverAndShutdown() throws Exception {
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToActive(1);
        dfsCluster.waitActive(1);
        dfsCluster.shutdownNameNode(2);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(1);
        dfsCluster.restartNameNode(2);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(1);
        dfsCluster.transitionToObserver(2);
        dfs.getFileStatus(this.testPath);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        dfsCluster.transitionToStandby(1);
        dfsCluster.transitionToActive(0);
        dfsCluster.waitActive(0);
    }

    @Test
    public void testBootstrap() throws Exception {
        for (URI u : dfsCluster.getNameDirs(2)) {
            File dir = new File(u.getPath());
            Assert.assertTrue((boolean)FileUtil.fullyDelete((File)dir));
        }
        int rc = BootstrapStandby.run((String[])new String[]{"-nonInteractive"}, (Configuration)dfsCluster.getConfiguration(2));
        Assert.assertEquals((long)0L, (long)rc);
    }

    @Test
    public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
        dfs.create(this.testPath, (short)1).close();
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        dfs.open(this.testPath).close();
        this.assertSentTo(2);
        dfsCluster.getFileSystem(2).setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
        BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
        ((BlockManager)Mockito.doAnswer(invocation -> {
            ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
            LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
            ArrayList<LocatedBlock> fakeBlocks = new ArrayList<LocatedBlock>();
            fakeBlocks.add(fakeBlock);
            return new LocatedBlocks(0L, false, fakeBlocks, null, true, null, null);
        }).when((Object)bmSpy)).createLocatedBlocks((BlockInfo[])ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), (FileEncryptionInfo)ArgumentMatchers.any(), (ErasureCodingPolicy)ArgumentMatchers.any());
        dfs.open(this.testPath).close();
        this.assertSentTo(0);
        Mockito.reset((Object[])new BlockManager[]{bmSpy});
        dfsCluster.getFileSystem(2).setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
        dfs.open(this.testPath).close();
        this.assertSentTo(2);
    }

    @Test
    public void testObserverNodeBlockMissingRetry() throws Exception {
        TestObserverNode.setObserverRead(true);
        dfs.create(this.testPath, (short)1).close();
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        BlockManager bmSpy = NameNodeAdapter.spyOnBlockManager(dfsCluster.getNameNode(2));
        ((BlockManager)Mockito.doAnswer(invocation -> {
            ArrayList<LocatedBlock> fakeBlocks = new ArrayList<LocatedBlock>();
            ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
            LocatedBlock fakeBlock = new LocatedBlock(b, DatanodeInfo.EMPTY_ARRAY);
            fakeBlocks.add(fakeBlock);
            return new LocatedBlocks(0L, false, fakeBlocks, null, true, null, null);
        }).when((Object)bmSpy)).createLocatedBlocks((BlockInfo[])Mockito.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyBoolean(), (FileEncryptionInfo)Mockito.any(), (ErasureCodingPolicy)Mockito.any());
        dfs.open(this.testPath);
        this.assertSentTo(0);
        dfs.getClient().listPaths("/", new byte[0], true);
        this.assertSentTo(0);
        dfs.getClient().getLocatedFileInfo(this.testPath.toString(), false);
        this.assertSentTo(0);
        Mockito.reset((Object[])new BlockManager[]{bmSpy});
    }

    @Test
    public void testFsckWithObserver() throws Exception {
        TestObserverNode.setObserverRead(true);
        dfs.create(this.testPath, (short)1).close();
        this.assertSentTo(0);
        String result = TestFsck.runFsck(conf, 0, true, "/");
        LOG.info("result=" + result);
        Assert.assertTrue((boolean)result.contains("Status: HEALTHY"));
    }

    @Test
    public void testObserverRetryActiveException() throws Exception {
        boolean thrownRetryException = false;
        try {
            dfsCluster.getNameNode(2).getRpcServer().mkdirs("/testActiveRetryException", FsPermission.createImmutable((short)493), true);
        }
        catch (ObserverRetryOnActiveException orae) {
            thrownRetryException = true;
        }
        Assert.assertTrue((boolean)thrownRetryException);
    }

    @Test
    public void testAccessTimeUpdateRedirectToActive() throws Exception {
        Path tmpTestPath = new Path("/TestObserverNodeAccessTime");
        dfs.create(tmpTestPath, (short)1).close();
        this.assertSentTo(0);
        dfs.open(tmpTestPath).close();
        this.assertSentTo(2);
        dfs.setTimes(tmpTestPath, 0L, 0L);
        dfs.open(tmpTestPath).close();
        this.assertSentTo(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStickyActive() throws Exception {
        Path testFile = new Path(this.testPath, "testStickyActive");
        Configuration newConf = new Configuration(conf);
        newConf.setLong("dfs.client.failover.observer.probe.retry.period", 5000L);
        newConf.setBoolean("fs.hdfs.impl.disable.cache", true);
        DistributedFileSystem newFs = (DistributedFileSystem)FileSystem.get((Configuration)newConf);
        newFs.create(testFile, (short)1).close();
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        dfsCluster.transitionToStandby(2);
        Assert.assertEquals((String)"NN[2] should be standby", (Object)HAServiceProtocol.HAServiceState.STANDBY, (Object)NameNodeAdapter.getServiceState(dfsCluster.getNameNode(2)));
        newFs.open(testFile).close();
        this.assertSentTo(0);
        int newObserver = 1;
        dfsCluster.transitionToObserver(newObserver);
        Assert.assertEquals((String)("NN[" + newObserver + "] should be observer"), (Object)HAServiceProtocol.HAServiceState.OBSERVER, (Object)NameNodeAdapter.getServiceState(dfsCluster.getNameNode(newObserver)));
        long startTime = Time.monotonicNow();
        try {
            while (Time.monotonicNow() - startTime <= 5000L) {
                newFs.open(testFile).close();
                this.assertSentTo(0);
                Thread.sleep(200L);
            }
        }
        catch (AssertionError ae) {
            if (Time.monotonicNow() - startTime <= 5000L) {
                throw ae;
            }
            this.assertSentTo(newObserver);
        }
        finally {
            dfsCluster.transitionToStandby(1);
            dfsCluster.transitionToObserver(2);
        }
    }

    @Test
    public void testFsckDelete() throws Exception {
        TestObserverNode.setObserverRead(true);
        DFSTestUtil.createFile((FileSystem)dfs, this.testPath, 512L, (short)1, 0L);
        DFSTestUtil.waitForReplication(dfs, this.testPath, (short)1, 5000);
        ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)dfs, this.testPath);
        int dnToCorrupt = DFSTestUtil.firstDnWithBlock(dfsCluster, block);
        FSNamesystem ns = dfsCluster.getNameNode(0).getNamesystem();
        dfsCluster.corruptReplica(dnToCorrupt, block);
        dfsCluster.restartDataNode(dnToCorrupt);
        DFSTestUtil.waitCorruptReplicas((FileSystem)dfs, ns, this.testPath, block, 1);
        String result = TestFsck.runFsck(conf, 1, true, "/", "-delete");
        LOG.info("result=" + result);
        Assert.assertTrue((boolean)result.contains("The filesystem under path '/' is CORRUPT"));
    }

    @Test
    public void testMkdirsRaceWithObserverRead() throws Exception {
        dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        dfsCluster.rollEditLogAndTail(0);
        dfs.getFileStatus(this.testPath);
        this.assertSentTo(2);
        FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(dfsCluster.getNameNode(0), 100L);
        int numThreads = 4;
        ClientState[] clientStates = new ClientState[4];
        ExecutorService threadPool = HadoopExecutors.newFixedThreadPool((int)4);
        Future[] futures = new Future[4];
        Configuration conf2 = new Configuration(conf);
        conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
        for (int i = 0; i < 4; ++i) {
            clientStates[i] = new ClientState();
            futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
        }
        Thread.sleep(150L);
        long activStateId = dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
        dfsCluster.rollEditLogAndTail(0);
        boolean finished = true;
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (ExecutionException e) {
                finished = false;
                LOG.warn("MkDirRunner thread failed", e.getCause());
            }
        }
        Assert.assertTrue((String)"Not all threads finished", (boolean)finished);
        threadPool.shutdown();
        Assert.assertEquals((String)"Active and Observer stateIds don't match", (long)dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(), (long)dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
        for (int i = 0; i < 4; ++i) {
            Assert.assertTrue((String)("Client #" + i + " lastSeenStateId=" + clientStates[i].lastSeenStateId + " activStateId=" + activStateId + "\n" + clientStates[i].fnfe), (clientStates[i].lastSeenStateId >= activStateId && clientStates[i].fnfe == null ? 1 : 0) != 0);
        }
        Mockito.reset((Object[])new FSEditLog[]{spyEditLog});
    }

    @Test
    public void testGetListingForDeletedDir() throws Exception {
        Path path = new Path("/dir1/dir2/testFile");
        dfs.create(path).close();
        Assert.assertTrue((boolean)dfs.delete(new Path("/dir1/dir2"), true));
        LambdaTestUtils.intercept(FileNotFoundException.class, () -> dfs.listLocatedStatus(new Path("/dir1/dir2")));
    }

    @Test
    public void testSimpleReadEmptyDirOrFile() throws IOException {
        dfs.mkdirs(new Path("/emptyDir"));
        this.assertSentTo(0);
        dfs.getClient().listPaths("/", new byte[0], true);
        this.assertSentTo(2);
        dfs.getClient().getLocatedFileInfo("/emptyDir", true);
        this.assertSentTo(2);
        dfs.create(new Path("/emptyFile"), (short)1);
        this.assertSentTo(0);
        dfs.getClient().getLocatedFileInfo("/emptyFile", true);
        this.assertSentTo(2);
        dfs.getClient().getBlockLocations("/emptyFile", 0L, 1L);
        this.assertSentTo(2);
    }

    private static void assertSentTo(DistributedFileSystem fs, int nnIdx) throws IOException {
        Assert.assertTrue((String)("Request was not sent to the expected namenode " + nnIdx), (boolean)HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
    }

    private void assertSentTo(int nnIdx) throws IOException {
        Assert.assertTrue((String)("Request was not sent to the expected namenode " + nnIdx), (boolean)HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
    }

    private static void setObserverRead(boolean flag) throws Exception {
        dfs = HATestUtil.configureObserverReadFs(dfsCluster, conf, ObserverReadProxyProvider.class, flag);
    }

    static class MkDirRunner
    implements Runnable {
        private static final Path DIR_PATH = new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
        private DistributedFileSystem fs;
        private ClientState clientState;

        MkDirRunner(Configuration conf, ClientState cs) throws IOException {
            this.fs = (DistributedFileSystem)FileSystem.get((Configuration)conf);
            this.clientState = cs;
        }

        @Override
        public void run() {
            try {
                this.fs.mkdirs(DIR_PATH);
                this.clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(this.fs);
                TestObserverNode.assertSentTo(this.fs, 0);
                FileStatus stat = this.fs.getFileStatus(DIR_PATH);
                TestObserverNode.assertSentTo(this.fs, 2);
                Assert.assertTrue((String)"Should be a directory", (boolean)stat.isDirectory());
            }
            catch (FileNotFoundException ioe) {
                this.clientState.fnfe = ioe;
            }
            catch (Exception e) {
                Assert.fail((String)("Unexpected exception: " + e));
            }
        }
    }

    static class ClientState {
        private long lastSeenStateId = -7L;
        private FileNotFoundException fnfe;

        ClientState() {
        }
    }
}

