package org.apache.hadoop.tools.fedbalance.procedure;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/tools/fedbalance/procedure/TestBalanceProcedureScheduler.class */
public class TestBalanceProcedureScheduler {
    private static MiniDFSCluster cluster;
    private static final Configuration CONF = new Configuration();
    private static DistributedFileSystem fs;
    private static final int DEFAULT_BLOCK_SIZE = 512;

    @BeforeClass
    public static void setup() throws IOException {
        CONF.setBoolean("dfs.namenode.delegation.token.always-use", true);
        CONF.set("fs.defaultFS", "hdfs:///");
        CONF.setBoolean("dfs.namenode.acls.enabled", true);
        CONF.setLong("dfs.blocksize", 512L);
        CONF.setLong("dfs.namenode.fs-limits.min-block-size", 0L);
        CONF.setInt("hdfs.fedbalance.procedure.work.thread.num", 1);
        cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
        cluster.waitClusterUp();
        cluster.waitActive();
        fs = cluster.getFileSystem();
        String str = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure";
        CONF.set("hdfs.fedbalance.procedure.scheduler.journal.uri", str);
        fs.mkdirs(new Path(str));
    }

    @AfterClass
    public static void close() {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test(timeout = 60000)
    public void testShutdownScheduler() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        BalanceJob.Builder builder = new BalanceJob.Builder();
        builder.nextProcedure(new WaitProcedure("wait", 1000L, 5000L));
        BalanceJob build = builder.build();
        balanceProcedureScheduler.submit(build);
        Thread.sleep(1000L);
        balanceProcedureScheduler.shutDownAndWait(30000);
        ((BalanceJournal) ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF)).clear(build);
    }

    @Test(timeout = 60000)
    public void testSuccessfulJob() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            ArrayList arrayList = new ArrayList();
            BalanceJob.Builder builder = new BalanceJob.Builder();
            for (int i = 0; i < 5; i++) {
                RecordProcedure recordProcedure = new RecordProcedure("record-" + i, 1000L);
                builder.nextProcedure(recordProcedure);
                arrayList.add(recordProcedure);
            }
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
            Assert.assertNull(build.getError());
            Assert.assertEquals(5L, RecordProcedure.getFinishList().size());
            for (int i2 = 0; i2 < RecordProcedure.getFinishList().size(); i2++) {
                Assert.assertEquals(arrayList.get(i2), RecordProcedure.getFinishList().get(i2));
            }
        } finally {
            balanceProcedureScheduler.shutDownAndWait(2);
        }
    }

    @Test(timeout = 60000)
    public void testFailedJob() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            BalanceProcedure balanceProcedure = (BalanceProcedure) Mockito.mock(BalanceProcedure.class);
            ((BalanceProcedure) Mockito.doThrow(new Throwable[]{new IOException("Job failed exception.")}).when(balanceProcedure)).execute();
            ((BalanceProcedure) Mockito.doReturn("bad-procedure").when(balanceProcedure)).name();
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(balanceProcedure);
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
            GenericTestUtils.assertExceptionContains("Job failed exception", build.getError());
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testGetJobAfterRecover() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            WaitProcedure[] waitProcedureArr = new WaitProcedure[5];
            for (int i = 0; i < 5; i++) {
                WaitProcedure waitProcedure = new WaitProcedure("wait" + i, 1000L, 1000L);
                builder.nextProcedure(waitProcedure).removeAfterDone(false);
                waitProcedureArr[i] = waitProcedure;
            }
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            Thread.sleep(((Math.abs(new Random().nextInt()) % 5) * 1000) + 1000);
            balanceProcedureScheduler.shutDownAndWait(2);
            WaitProcedure waitProcedure2 = (WaitProcedure) build.getCurProcedure();
            int i2 = -1;
            int i3 = 0;
            while (true) {
                if (i3 >= waitProcedureArr.length) {
                    break;
                }
                if (waitProcedureArr[i3].name().equals(waitProcedure2.name())) {
                    i2 = i3;
                    break;
                }
                i3++;
            }
            balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
            balanceProcedureScheduler.init(true);
            balanceProcedureScheduler.waitUntilDone(build);
            BalanceJob findJob = balanceProcedureScheduler.findJob(build);
            Assert.assertNull(findJob.getError());
            Assert.assertNotSame(build, findJob);
            Assert.assertEquals(build, findJob);
            List procedureTableToList = procedureTableToList(findJob.getProcedureTable(), "wait0");
            for (int i4 = 0; i4 < i2; i4++) {
                Assert.assertFalse(((WaitProcedure) procedureTableToList.get(i4)).getExecuted());
            }
            for (int i5 = i2; i5 < waitProcedureArr.length; i5++) {
                Assert.assertTrue(((WaitProcedure) procedureTableToList.get(i5)).getExecuted());
            }
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRetry() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(new RetryProcedure("retry", 1000L, 3));
            BalanceJob build = builder.build();
            long monotonicNow = Time.monotonicNow();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
            Assert.assertNull(build.getError());
            Assert.assertEquals(true, Boolean.valueOf(Time.monotonicNow() - monotonicNow > 3000));
            Assert.assertEquals(3L, r0.getTotalRetry());
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testEmptyJob() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            BalanceJob build = new BalanceJob.Builder().build();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
        } finally {
            balanceProcedureScheduler.shutDownAndWait(2);
        }
    }

    @Test(timeout = 60000)
    public void testJobSerializeAndDeserialize() throws Exception {
        BalanceJob.Builder builder = new BalanceJob.Builder();
        for (int i = 0; i < 5; i++) {
            builder.nextProcedure(new RecordProcedure("record-" + i, 1000L));
        }
        builder.nextProcedure(new RetryProcedure("retry", 1000L, 3));
        BalanceJob build = builder.build();
        build.setId(BalanceProcedureScheduler.allocateJobId());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        build.write(new DataOutputStream(byteArrayOutputStream));
        byteArrayOutputStream.flush();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
        BalanceJob build2 = new BalanceJob.Builder().build();
        build2.readFields(new DataInputStream(byteArrayInputStream));
        Assert.assertEquals(build, build2);
    }

    @Test(timeout = 180000)
    public void testSchedulerDownAndRecoverJob() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        Path path = new Path("/testSchedulerDownAndRecoverJob");
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(new MultiPhaseProcedure("retry", 1000L, 10, CONF, path.toString()));
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            Thread.sleep(500L);
            balanceProcedureScheduler.shutDownAndWait(2);
            Assert.assertFalse(build.isJobDone());
            int length = fs.listStatus(path).length;
            Assert.assertTrue(length > 0 && length < 10);
            balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
            balanceProcedureScheduler.init(true);
            balanceProcedureScheduler.waitUntilDone(build);
            Assert.assertEquals(10L, fs.listStatus(path).length);
            for (int i = 0; i < 10; i++) {
                Assert.assertTrue(fs.exists(new Path(path, "phase-" + i)));
            }
            BalanceJob findJob = balanceProcedureScheduler.findJob(build);
            Assert.assertNull(findJob.getError());
            Assert.assertNotSame(build, findJob);
            Assert.assertEquals(build, findJob);
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            if (fs.exists(path)) {
                fs.delete(path, true);
            }
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testRecoverJobFromJournal() throws Exception {
        BalanceJournal balanceJournal = (BalanceJournal) ReflectionUtils.newInstance(BalanceJournalInfoHDFS.class, CONF);
        BalanceJob.Builder builder = new BalanceJob.Builder();
        WaitProcedure waitProcedure = new WaitProcedure("wait0", 1000L, 5000L);
        WaitProcedure waitProcedure2 = new WaitProcedure("wait1", 1000L, 1000L);
        builder.nextProcedure(waitProcedure).nextProcedure(waitProcedure2);
        BalanceJob build = builder.build();
        build.setId(BalanceProcedureScheduler.allocateJobId());
        build.setCurrentProcedure(waitProcedure2);
        build.setLastProcedure((BalanceProcedure) null);
        balanceJournal.saveJob(build);
        long monotonicNow = Time.monotonicNow();
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            balanceProcedureScheduler.waitUntilDone(build);
            long monotonicNow2 = Time.monotonicNow() - monotonicNow;
            Assert.assertTrue(monotonicNow2 >= 1000 && monotonicNow2 < 5000);
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testClearJournalFail() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        BalanceJournal balanceJournal = (BalanceJournal) Mockito.mock(BalanceJournal.class);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((BalanceJournal) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger.incrementAndGet() == 1) {
                throw new IOException("Mock clear failure");
            }
            return null;
        }).when(balanceJournal)).clear((BalanceJob) ArgumentMatchers.any(BalanceJob.class));
        balanceProcedureScheduler.setJournal(balanceJournal);
        try {
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(new WaitProcedure("wait", 1000L, 1000L));
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
            Assert.assertEquals(2L, atomicInteger.get());
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testJobRecoveryWhenWriteJournalFail() throws Exception {
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(CONF);
        balanceProcedureScheduler.init(true);
        try {
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            BalanceJob.Builder builder = new BalanceJob.Builder();
            builder.nextProcedure(new WaitProcedure("wait", 1000L, 1000L)).nextProcedure(new UnrecoverableProcedure("shutdown", 1000L, () -> {
                cluster.restartNameNode(false);
                return true;
            })).nextProcedure(new UnrecoverableProcedure("recoverFlag", 1000L, () -> {
                atomicBoolean.set(false);
                return true;
            })).nextProcedure(new WaitProcedure("wait", 1000L, 1000L));
            BalanceJob build = builder.build();
            balanceProcedureScheduler.submit(build);
            balanceProcedureScheduler.waitUntilDone(build);
            Assert.assertTrue(build.isJobDone());
            Assert.assertNull(build.getError());
            Assert.assertTrue(atomicBoolean.get());
            balanceProcedureScheduler.shutDownAndWait(2);
        } catch (Throwable th) {
            balanceProcedureScheduler.shutDownAndWait(2);
            throw th;
        }
    }

    <T extends BalanceProcedure> List<T> procedureTableToList(Map<String, T> map, String str) {
        ArrayList arrayList = new ArrayList();
        T t = map.get(str);
        while (true) {
            T t2 = t;
            if (t2 == null) {
                return arrayList;
            }
            arrayList.add(t2);
            t = map.get(t2.nextProcedure());
        }
    }
}
