package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/TestLocatedBlocksRefresher.class */
public class TestLocatedBlocksRefresher {
    private static final int BLOCK_SIZE = 1048576;
    private static final short REPLICATION_FACTOR = 4;
    private final int numOfBlocks = 24;
    private final int fileLength = 25165824;
    private final int dfsClientPrefetchSize = 12582912;
    private MiniDFSCluster cluster;
    private Configuration conf;
    private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);
    private static final String[] RACKS = {"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"};
    private static final int NUM_DATA_NODES = RACKS.length;

    @Before
    public void setUp() throws Exception {
        this.cluster = null;
        this.conf = new HdfsConfiguration();
        this.conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
        this.conf.setInt("dfs.replication", REPLICATION_FACTOR);
        this.conf.setLong("dfs.blocksize", DiskBalancerTestUtil.MB);
        this.conf.setLong("dfs.client.read.prefetch.size", 12582912L);
    }

    @After
    public void tearDown() throws Exception {
        if (this.cluster != null) {
            this.cluster.shutdown(true, true);
        }
    }

    private void setupTest(long j) throws IOException {
        this.conf.setLong("dfs.client.refresh.read-block-locations.ms", j);
        this.conf.set("dfs.client.context", UUID.randomUUID().toString());
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
        this.cluster.waitActive();
    }

    @Test
    public void testDisabledOnZeroInterval() throws IOException {
        setupTest(0L);
        Assert.assertNull(this.cluster.getFileSystem().getClient().getLocatedBlockRefresher());
    }

    @Test
    public void testEnabledOnNonZeroInterval() throws Exception {
        setupTest(1000L);
        LocatedBlocksRefresher locatedBlockRefresher = this.cluster.getFileSystem().getClient().getLocatedBlockRefresher();
        Assert.assertNotNull(locatedBlockRefresher);
        assertNoMoreRefreshes(locatedBlockRefresher);
    }

    @Test
    public void testRefreshOnDeadNodes() throws Exception {
        setupTest(1000L);
        DistributedFileSystem fileSystem = this.cluster.getFileSystem();
        DFSClient client = fileSystem.getClient();
        LocatedBlocksRefresher locatedBlockRefresher = client.getLocatedBlockRefresher();
        DFSInputStream open = client.open(createTestFile(fileSystem));
        Throwable th = null;
        try {
            LocatedBlocks locatedBlocks = open.locatedBlocks;
            Assert.assertEquals(12L, locatedBlocks.locatedBlockCount());
            Assert.assertFalse(locatedBlockRefresher.isInputStreamTracked(open));
            locatedBlockRefresher.addInputStream(open);
            Assert.assertTrue(locatedBlockRefresher.isInputStreamTracked(open));
            assertNoMoreRefreshes(locatedBlockRefresher);
            synchronized (open.infoLock) {
                Assert.assertSame(locatedBlocks, open.locatedBlocks);
            }
            stopNodeHostingBlocks(open, NUM_DATA_NODES - 1);
            byte[] bArr = new byte[262144];
            open.read(0L, bArr, 0, bArr.length);
            Assert.assertEquals(1L, open.getLocalDeadNodes().size());
            assertRefreshes(locatedBlockRefresher, 1);
            synchronized (open.infoLock) {
                Assert.assertNotSame(locatedBlocks, open.locatedBlocks);
                Assert.assertTrue(open.getLocalDeadNodes().isEmpty());
            }
            assertNoMoreRefreshes(locatedBlockRefresher);
            stopNodeHostingBlocks(open, NUM_DATA_NODES - 2);
            byte[] bArr2 = new byte[262144];
            open.read(0L, bArr2, 0, bArr2.length);
            Assert.assertTrue(open.getLocalDeadNodes().size() > 0);
            assertRefreshes(locatedBlockRefresher, 1);
            synchronized (open.infoLock) {
                Assert.assertNotSame(locatedBlocks, open.locatedBlocks);
                Assert.assertTrue(open.getLocalDeadNodes().isEmpty());
            }
            locatedBlockRefresher.removeInputStream(open);
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    open.close();
                }
            }
            assertNoMoreRefreshes(locatedBlockRefresher);
        } catch (Throwable th3) {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }

    private void stopNodeHostingBlocks(DFSInputStream dFSInputStream, int i) {
        synchronized (dFSInputStream.infoLock) {
            int findBlock = dFSInputStream.locatedBlocks.findBlock(0L);
            for (int i2 = 0; i2 < REPLICATION_FACTOR; i2++) {
                if (this.cluster.stopDataNode(dFSInputStream.locatedBlocks.get(findBlock).getLocations()[i2].getXferAddr()) != null) {
                    Assert.assertEquals(i, this.cluster.getDataNodes().size());
                }
            }
            throw new RuntimeException("Could not find a datanode to stop");
        }
    }

    private void assertNoMoreRefreshes(LocatedBlocksRefresher locatedBlocksRefresher) throws InterruptedException {
        long interval = locatedBlocksRefresher.getInterval();
        int runCount = locatedBlocksRefresher.getRunCount();
        int refreshCount = locatedBlocksRefresher.getRefreshCount();
        LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes", Integer.valueOf(runCount + 3), Integer.valueOf(runCount));
        awaitWithTimeout(() -> {
            return Boolean.valueOf(locatedBlocksRefresher.getRunCount() > runCount + 3);
        }, 5 * interval);
        Assert.assertEquals(refreshCount, locatedBlocksRefresher.getRefreshCount());
    }

    private void assertRefreshes(LocatedBlocksRefresher locatedBlocksRefresher, int i) throws InterruptedException {
        int runCount = locatedBlocksRefresher.getRunCount();
        int refreshCount = locatedBlocksRefresher.getRefreshCount();
        int i2 = 3;
        if (i < 0) {
            i = 3;
        }
        LOG.info("Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}", new Object[]{Integer.valueOf(runCount + 3), Integer.valueOf(runCount), Integer.valueOf(refreshCount + i), Integer.valueOf(refreshCount)});
        awaitWithTimeout(() -> {
            return Boolean.valueOf(locatedBlocksRefresher.getRunCount() >= runCount + i2);
        }, 10000L);
        Assert.assertEquals(i, locatedBlocksRefresher.getRefreshCount() - refreshCount);
    }

    private void awaitWithTimeout(Supplier<Boolean> supplier, long j) throws InterruptedException {
        long monotonicNow = Time.monotonicNow();
        while (!supplier.get().booleanValue()) {
            if (Time.monotonicNow() - monotonicNow > j) {
                Assert.fail("Timed out waiting for true condition");
                return;
            }
            Thread.sleep(50L);
        }
    }

    private String createTestFile(FileSystem fileSystem) throws IOException {
        String str = "/located_blocks_" + UUID.randomUUID().toString();
        Path path = new Path(str);
        FSDataOutputStream create = fileSystem.create(path, (short) 4);
        Throwable th = null;
        try {
            try {
                create.write(new byte[25165824]);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                fileSystem.deleteOnExit(path);
                return str;
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }
}
