package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.class */
public class SchedulerIsolatedTasksTest {
    @Test
    public void testAddAndRemoveInstance() {
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(2);
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance3);
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(6L, scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(randomInstance2);
                Assert.fail("Scheduler accepted instance twice");
            } catch (IllegalArgumentException e) {
            }
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(6L, scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(randomInstance2);
                Assert.fail("Scheduler accepted dead instance");
            } catch (IllegalArgumentException e2) {
            }
            scheduler.instanceDied(randomInstance);
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(randomInstance3);
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            Assert.assertFalse(randomInstance.isAlive());
            Assert.assertFalse(randomInstance2.isAlive());
            Assert.assertFalse(randomInstance3.isAlive());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testScheduleImmediately() {
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot, simpleSlot2, simpleSlot3, simpleSlot4, simpleSlot5));
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false);
                Assert.fail("Scheduler accepted scheduling request without available resource.");
            } catch (NoResourceAvailableException e) {
            }
            simpleSlot3.releaseSlot();
            simpleSlot4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            SimpleSlot simpleSlot6 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            SimpleSlot simpleSlot7 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(simpleSlot, simpleSlot2, simpleSlot3, simpleSlot4, simpleSlot5, simpleSlot6, simpleSlot7));
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            simpleSlot.releaseSlot();
            simpleSlot2.releaseSlot();
            simpleSlot5.releaseSlot();
            simpleSlot6.releaseSlot();
            simpleSlot7.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testScheduleQueueing() {
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            for (int i = 0; i < 50; i++) {
                scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(((int) (Math.random() * 3.0d)) + 1));
            }
            Assert.assertEquals(50L, scheduler.getNumberOfAvailableInstances());
            int numberOfAvailableSlots = scheduler.getNumberOfAvailableSlots();
            ArrayList arrayList = new ArrayList();
            final HashSet hashSet = new HashSet();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerIsolatedTasksTest.1
                @Override // java.lang.Runnable
                public void run() {
                    int i2 = 0;
                    while (i2 < 2000) {
                        try {
                            synchronized (hashSet) {
                                while (hashSet.isEmpty()) {
                                    hashSet.wait();
                                }
                                Iterator it = hashSet.iterator();
                                SimpleSlot simpleSlot = (SimpleSlot) it.next();
                                it.remove();
                                simpleSlot.releaseSlot();
                                i2++;
                            }
                        } catch (Throwable th) {
                            atomicBoolean.set(true);
                            return;
                        }
                    }
                }
            });
            thread.start();
            for (int i2 = 0; i2 < 2000; i2++) {
                Future allocateSlot = scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), true);
                allocateSlot.thenAcceptAsync(new AcceptFunction<SimpleSlot>() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerIsolatedTasksTest.2
                    public void accept(SimpleSlot simpleSlot) {
                        synchronized (hashSet) {
                            hashSet.add(simpleSlot);
                            hashSet.notifyAll();
                        }
                    }
                }, TestingUtils.defaultExecutionContext());
                arrayList.add(allocateSlot);
            }
            thread.join();
            Assert.assertFalse("The slot releasing thread caused an error.", atomicBoolean.get());
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add(((Future) it.next()).get());
            }
            Assert.assertEquals("All instances should have available slots.", 50L, scheduler.getNumberOfInstancesWithAvailableSlots());
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(arrayList2.toArray()));
            Assert.assertEquals("All slots should be available.", numberOfAvailableSlots, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testScheduleWithDyingInstances() {
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            SlotOwner randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance3);
            ArrayList<SimpleSlot> arrayList = new ArrayList();
            arrayList.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get());
            arrayList.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get());
            arrayList.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get());
            arrayList.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get());
            arrayList.add(scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get());
            randomInstance2.markDead();
            for (SimpleSlot simpleSlot : arrayList) {
                if (simpleSlot.getOwner() == randomInstance2) {
                    Assert.assertTrue(simpleSlot.isCanceled());
                } else {
                    Assert.assertFalse(simpleSlot.isCanceled());
                }
                simpleSlot.releaseSlot();
            }
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableSlots());
            randomInstance.markDead();
            randomInstance3.markDead();
            try {
                scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getDummyTask()), false).get();
                Assert.fail("Scheduler served a slot from a dead instance");
            } catch (Exception e) {
                Assert.fail("Wrong exception type.");
            } catch (NoResourceAvailableException e2) {
            }
            Assert.assertEquals(0L, scheduler.getNumberOfInstancesWithAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testSchedulingLocation() {
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(2);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance3);
            SimpleSlot simpleSlot = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(new Instance[0])), false).get();
            Instance instance = (Instance) simpleSlot.getOwner();
            Instance instance2 = instance != randomInstance ? randomInstance : randomInstance2;
            Instance instance3 = instance == randomInstance3 ? randomInstance2 : randomInstance3;
            SimpleSlot simpleSlot2 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(simpleSlot.getTaskManagerLocation())), false).get();
            Assert.assertEquals(instance, simpleSlot2.getOwner());
            Assert.assertEquals(instance2, ((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(instance, instance2)), false).get()).getOwner());
            SimpleSlot simpleSlot3 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(instance, instance3)), false).get();
            SimpleSlot simpleSlot4 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(instance, instance3)), false).get();
            Assert.assertEquals(instance3, simpleSlot3.getOwner());
            Assert.assertEquals(instance3, simpleSlot4.getOwner());
            SimpleSlot simpleSlot5 = (SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(instance, instance3)), false).get();
            Assert.assertEquals(instance2, simpleSlot5.getOwner());
            simpleSlot2.releaseSlot();
            simpleSlot5.releaseSlot();
            Assert.assertEquals(instance, ((SimpleSlot) scheduler.allocateSlot(new ScheduledUnit(SchedulerTestUtils.getTestVertex(instance, instance3)), false).get()).getOwner());
            Assert.assertEquals(1L, scheduler.getNumberOfUnconstrainedAssignments());
            Assert.assertEquals(1L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(5L, scheduler.getNumberOfLocalizedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
