/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver.wal;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestLogRolling {
    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
    private HRegionServer server = null;
    private String tableName = null;
    private byte[] value;
    private FileSystem fs;
    private MiniDFSCluster dfsCluster;
    private Admin admin;
    private MiniHBaseCluster cluster;
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

    public TestLogRolling() {
        String className = this.getClass().getName();
        StringBuilder v = new StringBuilder(className);
        while (v.length() < 1000) {
            v.append(className);
        }
        this.value = Bytes.toBytes((String)v.toString());
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
        TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 786432L);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10000);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
        TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10000L);
        TEST_UTIL.getConfiguration().setInt("hbase.server.thread.wakefrequency", 2000);
        TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
        TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
        TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
        TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
        TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
    }

    @Before
    public void setUp() throws Exception {
        TEST_UTIL.startMiniCluster(1, 1, 2);
        this.cluster = TEST_UTIL.getHBaseCluster();
        this.dfsCluster = TEST_UTIL.getDFSCluster();
        this.fs = TEST_UTIL.getTestFileSystem();
        this.admin = TEST_UTIL.getHBaseAdmin();
        this.cluster.getMaster().balanceSwitch(false);
    }

    @After
    public void tearDown() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    private void startAndWriteData() throws IOException, InterruptedException {
        new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
        this.server = this.cluster.getRegionServerThreads().get(0).getRegionServer();
        Table table = this.createTestTable(this.tableName);
        this.server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
        for (int i = 1; i <= 256; ++i) {
            this.doPut(table, i);
            if (i % 32 != 0) continue;
            try {
                Thread.sleep(2000L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testLogRollOnNothingWritten() throws Exception {
        Configuration conf = TEST_UTIL.getConfiguration();
        WALFactory wals = new WALFactory(conf, null, ServerName.valueOf((String)"test.com", (int)8080, (long)1L).toString());
        WAL newLog = wals.getWAL(new byte[0]);
        try {
            newLog.rollWriter(true);
        }
        finally {
            wals.close();
        }
    }

    @Test
    public void testLogRolling() throws Exception {
        this.tableName = TestLogRolling.getName();
        this.startAndWriteData();
        WAL log = this.server.getWAL(null);
        LOG.info((Object)("after writing there are " + DefaultWALProvider.getNumRolledLogFiles((WAL)log) + " log files"));
        for (Region r : this.server.getOnlineRegionsLocalContext()) {
            r.flush(true);
        }
        log.rollWriter();
        int count = DefaultWALProvider.getNumRolledLogFiles((WAL)log);
        LOG.info((Object)("after flushing all regions and rolling logs there are " + count + " log files"));
        Assert.assertTrue((String)("actual count: " + count), (count <= 2 ? 1 : 0) != 0);
    }

    private static String getName() {
        return "TestLogRolling";
    }

    void writeData(Table table, int rownum) throws IOException {
        this.doPut(table, rownum);
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    void validateData(Table table, int rownum) throws IOException {
        String row = "row" + String.format("%1$04d", rownum);
        Get get = new Get(Bytes.toBytes((String)row));
        get.addFamily(HConstants.CATALOG_FAMILY);
        Result result = table.get(get);
        Assert.assertTrue((result.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)Bytes.equals((byte[])this.value, (byte[])result.getValue(HConstants.CATALOG_FAMILY, null)));
        LOG.info((Object)("Validated row " + row));
    }

    void batchWriteAndWait(Table table, FSHLog log, int start, boolean expect, int timeout) throws IOException {
        for (int i = 0; i < 10; ++i) {
            Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", start + i))));
            put.add(HConstants.CATALOG_FAMILY, null, this.value);
            table.put(put);
        }
        Put tmpPut = new Put(Bytes.toBytes((String)"tmprow"));
        tmpPut.add(HConstants.CATALOG_FAMILY, null, this.value);
        long startTime = System.currentTimeMillis();
        long remaining = timeout;
        while (remaining > 0L && log.isLowReplicationRollEnabled() != expect) {
            table.put(tmpPut);
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            remaining = (long)timeout - (System.currentTimeMillis() - startTime);
        }
    }

    @Test
    public void testLogRollOnDatanodeDeath() throws Exception {
        TEST_UTIL.ensureSomeRegionServersAvailable(2);
        Assert.assertTrue((String)"This test requires WAL file replication set to 2.", (this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())));
        this.server = this.cluster.getRegionServer(0);
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)TestLogRolling.getName()));
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        this.admin.createTable(desc);
        Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
        Assert.assertTrue((boolean)((HTable)table).isAutoFlush());
        this.server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
        FSHLog log = (FSHLog)this.server.getWAL(null);
        final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
        log.registerWALActionsListener((WALActionsListener)new WALActionsListener.Base(){

            public void logRollRequested(boolean lowReplication) {
                if (lowReplication) {
                    lowReplicationHookCalled.lazySet(true);
                }
            }
        });
        Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
        ArrayList existingNodes = this.dfsCluster.getDataNodes();
        int numDataNodes = 3;
        this.dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null);
        ArrayList allNodes = this.dfsCluster.getDataNodes();
        for (int i = allNodes.size() - 1; i >= 0; --i) {
            if (!existingNodes.contains(allNodes.get(i))) continue;
            this.dfsCluster.stopDataNode(i);
        }
        Assert.assertTrue((String)("DataNodes " + this.dfsCluster.getDataNodes().size() + " default replication " + this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())), (this.dfsCluster.getDataNodes().size() >= this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1 ? 1 : 0) != 0);
        this.writeData(table, 2);
        long curTime = System.currentTimeMillis();
        LOG.info((Object)("log.getCurrentFileName(): " + log.getCurrentFileName()));
        long oldFilenum = DefaultWALProvider.extractFileNumFromWAL((WAL)log);
        Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == DefaultWALProvider.extractFileNumFromWAL((WAL)log) ? 1 : 0) != 0);
        DatanodeInfo[] pipeline = log.getPipeLine();
        Assert.assertTrue((pipeline.length == this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) ? 1 : 0) != 0);
        Assert.assertTrue((this.dfsCluster.stopDataNode(pipeline[0].getName()) != null ? 1 : 0) != 0);
        this.writeData(table, 2);
        long newFilenum = DefaultWALProvider.extractFileNumFromWAL((WAL)log);
        Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
        Assert.assertTrue((String)"The log rolling hook should have been called with the low replication flag", (boolean)lowReplicationHookCalled.get());
        this.writeData(table, 3);
        Assert.assertTrue((String)"The log should not roll again.", (DefaultWALProvider.extractFileNumFromWAL((WAL)log) == newFilenum ? 1 : 0) != 0);
        Assert.assertTrue((this.dfsCluster.stopDataNode(pipeline[1].getName()) != null ? 1 : 0) != 0);
        this.batchWriteAndWait(table, log, 3, false, 14000);
        int replication = log.getLogReplication();
        Assert.assertTrue((String)("LowReplication Roller should've been disabled, current replication=" + replication), (!log.isLowReplicationRollEnabled() ? 1 : 0) != 0);
        this.dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
        log.rollWriter(true);
        this.batchWriteAndWait(table, log, 13, true, 10000);
        replication = log.getLogReplication();
        Assert.assertTrue((String)("New log file should have the default replication instead of " + replication), (replication == this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) ? 1 : 0) != 0);
        Assert.assertTrue((String)"LowReplication Roller should've been enabled", (boolean)log.isLowReplicationRollEnabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogRollOnPipelineRestart() throws Exception {
        LOG.info((Object)"Starting testLogRollOnPipelineRestart");
        Assert.assertTrue((String)"This test requires WAL file replication.", (this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1 ? 1 : 0) != 0);
        LOG.info((Object)("Replication=" + this.fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())));
        try (HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);){
            this.server = this.cluster.getRegionServer(0);
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)TestLogRolling.getName()));
            desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
            this.admin.createTable(desc);
            HTable table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
            this.server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
            WAL log = this.server.getWAL(null);
            final ArrayList<Path> paths = new ArrayList<Path>();
            final ArrayList preLogRolledCalled = new ArrayList();
            paths.add(DefaultWALProvider.getCurrentFileName((WAL)log));
            log.registerWALActionsListener((WALActionsListener)new WALActionsListener.Base(){

                public void preLogRoll(Path oldFile, Path newFile) {
                    LOG.debug((Object)("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile));
                    preLogRolledCalled.add(new Integer(1));
                }

                public void postLogRoll(Path oldFile, Path newFile) {
                    paths.add(newFile);
                }
            });
            Assert.assertTrue((String)"Need append support for this test", (boolean)FSUtils.isAppendSupported((Configuration)TEST_UTIL.getConfiguration()));
            this.writeData((Table)table, 1002);
            long curTime = System.currentTimeMillis();
            LOG.info((Object)("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName((WAL)log)));
            long oldFilenum = DefaultWALProvider.extractFileNumFromWAL((WAL)log);
            Assert.assertTrue((String)"Log should have a timestamp older than now", (curTime > oldFilenum && oldFilenum != -1L ? 1 : 0) != 0);
            Assert.assertTrue((String)"The log shouldn't have rolled yet", (oldFilenum == DefaultWALProvider.extractFileNumFromWAL((WAL)log) ? 1 : 0) != 0);
            this.dfsCluster.restartDataNodes();
            Thread.sleep(1000L);
            this.dfsCluster.waitActive();
            LOG.info((Object)"Data Nodes restarted");
            this.validateData((Table)table, 1002);
            this.writeData((Table)table, 1003);
            long newFilenum = DefaultWALProvider.extractFileNumFromWAL((WAL)log);
            Assert.assertTrue((String)"Missing datanode should've triggered a log roll", (newFilenum > oldFilenum && newFilenum > curTime ? 1 : 0) != 0);
            this.validateData((Table)table, 1003);
            this.writeData((Table)table, 1004);
            this.dfsCluster.restartDataNodes();
            Thread.sleep(1000L);
            this.dfsCluster.waitActive();
            LOG.info((Object)"Data Nodes restarted");
            this.validateData((Table)table, 1004);
            this.writeData((Table)table, 1005);
            log.rollWriter(true);
            Assert.assertTrue((String)("preLogRolledCalled has size of " + preLogRolledCalled.size()), (preLogRolledCalled.size() >= 1 ? 1 : 0) != 0);
            HashSet<String> loggedRows = new HashSet<String>();
            FSUtils fsUtils = FSUtils.getInstance((FileSystem)this.fs, (Configuration)TEST_UTIL.getConfiguration());
            for (Path p : paths) {
                LOG.debug((Object)("recovering lease for " + p));
                fsUtils.recoverFileLease(((HFileSystem)this.fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
                LOG.debug((Object)("Reading WAL " + FSUtils.getPath((Path)p)));
                WAL.Reader reader = null;
                try {
                    WAL.Entry entry;
                    reader = WALFactory.createReader((FileSystem)this.fs, (Path)p, (Configuration)TEST_UTIL.getConfiguration());
                    while ((entry = reader.next()) != null) {
                        LOG.debug((Object)("#" + entry.getKey().getLogSeqNum() + ": " + entry.getEdit().getCells()));
                        for (Cell cell : entry.getEdit().getCells()) {
                            loggedRows.add(Bytes.toStringBinary((byte[])cell.getRow()));
                        }
                    }
                }
                catch (EOFException e) {
                    LOG.debug((Object)("EOF reading file " + FSUtils.getPath((Path)p)));
                }
                finally {
                    if (reader == null) continue;
                    reader.close();
                }
            }
            Assert.assertTrue((boolean)loggedRows.contains("row1002"));
            Assert.assertTrue((boolean)loggedRows.contains("row1003"));
            Assert.assertTrue((boolean)loggedRows.contains("row1004"));
            Assert.assertTrue((boolean)loggedRows.contains("row1005"));
            for (Region r : this.server.getOnlineRegionsLocalContext()) {
                r.flush(true);
            }
            try (ResultScanner scanner = table.getScanner(new Scan());){
                for (int i = 2; i <= 5; ++i) {
                    Result r = scanner.next();
                    Assert.assertNotNull((Object)r);
                    Assert.assertFalse((boolean)r.isEmpty());
                    Assert.assertEquals((Object)("row100" + i), (Object)Bytes.toString((byte[])r.getRow()));
                }
            }
            for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
                Assert.assertFalse((boolean)rsThread.getRegionServer().isAborted());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCompactionRecordDoesntBlockRolling() throws Exception {
        Table table = null;
        Table table2 = null;
        HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
        try {
            table = this.createTestTable(TestLogRolling.getName());
            table2 = this.createTestTable(TestLogRolling.getName() + "1");
            this.server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
            WAL log = this.server.getWAL(null);
            Region region = (Region)this.server.getOnlineRegions(table2.getName()).get(0);
            Store s = region.getStore(HConstants.CATALOG_FAMILY);
            this.admin.flush(TableName.NAMESPACE_TABLE_NAME);
            for (int i = 1; i <= 2; ++i) {
                this.doPut(table2, i);
                this.admin.flush(table2.getName());
            }
            this.doPut(table2, 3);
            Assert.assertEquals((String)"Should have no WAL after initial writes", (long)0L, (long)DefaultWALProvider.getNumRolledLogFiles((WAL)log));
            Assert.assertEquals((long)2L, (long)s.getStorefilesCount());
            log.rollWriter();
            Assert.assertEquals((String)"Should have WAL; one table is not flushed", (long)1L, (long)DefaultWALProvider.getNumRolledLogFiles((WAL)log));
            this.admin.flush(table2.getName());
            region.compact(false);
            Assert.assertNotNull((Object)s);
            for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
                Threads.sleepWithoutInterrupt((long)200L);
            }
            Assert.assertEquals((String)"Compaction didn't happen", (long)1L, (long)s.getStorefilesCount());
            this.doPut(table, 0);
            log.rollWriter();
            Assert.assertEquals((String)"Should have WAL; one table is not flushed", (long)1L, (long)DefaultWALProvider.getNumRolledLogFiles((WAL)log));
            this.admin.flush(table.getName());
            this.doPut(table, 1);
            log.rollWriter();
            Assert.assertEquals((String)"Should have 1 WALs at the end", (long)1L, (long)DefaultWALProvider.getNumRolledLogFiles((WAL)log));
        }
        finally {
            if (t != null) {
                t.close();
            }
            if (table != null) {
                table.close();
            }
            if (table2 != null) {
                table2.close();
            }
        }
    }

    private void doPut(Table table, int i) throws IOException {
        Put put = new Put(Bytes.toBytes((String)("row" + String.format("%1$04d", i))));
        put.add(HConstants.CATALOG_FAMILY, null, this.value);
        table.put(put);
    }

    private Table createTestTable(String tableName) throws IOException {
        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf((String)tableName));
        desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
        this.admin.createTable(desc);
        return new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
    }
}

