package org.apache.flink.runtime.jobmanager.scheduler;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.class */
public class SchedulerIsolatedTasksTest {
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
        TestingUtils.setCallingThreadDispatcher(system);
    }

    @AfterClass
    public static void teardown() {
        TestingUtils.setGlobalExecutionContext();
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testAddAndRemoveInstance() {
        try {
            Scheduler scheduler = new Scheduler();
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(2);
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance);
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(randomInstance3);
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(6L, scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(randomInstance2);
                Assert.fail("Scheduler accepted instance twice");
            } catch (IllegalArgumentException e) {
            }
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(6L, scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(randomInstance2);
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(4L, scheduler.getNumberOfAvailableSlots());
            try {
                scheduler.newInstanceAvailable(randomInstance2);
                Assert.fail("Scheduler accepted dead instance");
            } catch (IllegalArgumentException e2) {
            }
            scheduler.instanceDied(randomInstance);
            Assert.assertEquals(1L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            scheduler.instanceDied(randomInstance3);
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableInstances());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            Assert.assertFalse(randomInstance.isAlive());
            Assert.assertFalse(randomInstance2.isAlive());
            Assert.assertFalse(randomInstance3.isAlive());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testScheduleImmediately() {
        try {
            Scheduler scheduler = new Scheduler();
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(1));
            scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(2));
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            AllocatedSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            AllocatedSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            AllocatedSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            AllocatedSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            AllocatedSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately, scheduleImmediately2, scheduleImmediately3, scheduleImmediately4, scheduleImmediately5));
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
                Assert.fail("Scheduler accepted scheduling request without available resource.");
            } catch (NoResourceAvailableException e) {
            }
            scheduleImmediately3.releaseSlot();
            scheduleImmediately4.releaseSlot();
            Assert.assertEquals(2L, scheduler.getNumberOfAvailableSlots());
            AllocatedSlot scheduleImmediately6 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            AllocatedSlot scheduleImmediately7 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
            Assert.assertTrue(SchedulerTestUtils.areAllDistinct(scheduleImmediately, scheduleImmediately2, scheduleImmediately3, scheduleImmediately4, scheduleImmediately5, scheduleImmediately6, scheduleImmediately7));
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
            scheduleImmediately.releaseSlot();
            scheduleImmediately2.releaseSlot();
            scheduleImmediately5.releaseSlot();
            scheduleImmediately6.releaseSlot();
            scheduleImmediately7.releaseSlot();
            Assert.assertEquals(5L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testScheduleQueueing() {
        TestingUtils.setGlobalExecutionContext();
        try {
            try {
                Scheduler scheduler = new Scheduler();
                for (int i = 0; i < 50; i++) {
                    scheduler.newInstanceAvailable(SchedulerTestUtils.getRandomInstance(((int) (Math.random() * 3.0d)) + 1));
                }
                Assert.assertEquals(50L, scheduler.getNumberOfAvailableInstances());
                int numberOfAvailableSlots = scheduler.getNumberOfAvailableSlots();
                ArrayList arrayList = new ArrayList();
                final HashSet hashSet = new HashSet();
                final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                SlotAllocationFutureAction slotAllocationFutureAction = new SlotAllocationFutureAction() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerIsolatedTasksTest.1
                    public void slotAllocated(AllocatedSlot allocatedSlot) {
                        synchronized (hashSet) {
                            hashSet.add(allocatedSlot);
                            hashSet.notifyAll();
                        }
                    }
                };
                Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.SchedulerIsolatedTasksTest.2
                    @Override // java.lang.Runnable
                    public void run() {
                        int i2 = 0;
                        while (i2 < 2000) {
                            try {
                                synchronized (hashSet) {
                                    while (hashSet.isEmpty()) {
                                        hashSet.wait();
                                    }
                                    Iterator it = hashSet.iterator();
                                    AllocatedSlot allocatedSlot = (AllocatedSlot) it.next();
                                    it.remove();
                                    allocatedSlot.releaseSlot();
                                    i2++;
                                }
                            } catch (Throwable th) {
                                atomicBoolean.set(true);
                                return;
                            }
                        }
                    }
                });
                thread.start();
                for (int i2 = 0; i2 < 2000; i2++) {
                    SlotAllocationFuture scheduleQueued = scheduler.scheduleQueued(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
                    scheduleQueued.setFutureAction(slotAllocationFutureAction);
                    arrayList.add(scheduleQueued);
                }
                thread.join();
                Assert.assertFalse("The slot releasing thread caused an error.", atomicBoolean.get());
                ArrayList arrayList2 = new ArrayList();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    arrayList2.add(((SlotAllocationFuture) it.next()).waitTillAllocated());
                }
                Assert.assertTrue(SchedulerTestUtils.areAllDistinct(arrayList2.toArray()));
                Assert.assertEquals(numberOfAvailableSlots, scheduler.getNumberOfAvailableSlots());
                TestingUtils.setCallingThreadDispatcher(system);
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                TestingUtils.setCallingThreadDispatcher(system);
            }
        } catch (Throwable th) {
            TestingUtils.setCallingThreadDispatcher(system);
            throw th;
        }
    }

    @Test
    public void testScheduleWithDyingInstances() {
        try {
            Scheduler scheduler = new Scheduler();
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(1);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance3);
            ArrayList<AllocatedSlot> arrayList = new ArrayList();
            arrayList.add(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask())));
            arrayList.add(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask())));
            arrayList.add(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask())));
            arrayList.add(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask())));
            arrayList.add(scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask())));
            randomInstance2.markDead();
            for (AllocatedSlot allocatedSlot : arrayList) {
                if (allocatedSlot.getInstance() == randomInstance2) {
                    Assert.assertTrue(allocatedSlot.isCanceled());
                } else {
                    Assert.assertFalse(allocatedSlot.isCanceled());
                }
                allocatedSlot.releaseSlot();
            }
            Assert.assertEquals(3L, scheduler.getNumberOfAvailableSlots());
            randomInstance.markDead();
            randomInstance3.markDead();
            try {
                scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getDummyTask()));
                Assert.fail("Scheduler served a slot from a dead instance");
            } catch (Exception e) {
                Assert.fail("Wrong exception type.");
            } catch (NoResourceAvailableException e2) {
            }
            Assert.assertEquals(0L, scheduler.getNumberOfInstancesWithAvailableSlots());
            Assert.assertEquals(0L, scheduler.getNumberOfAvailableSlots());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testSchedulingLocation() {
        try {
            Scheduler scheduler = new Scheduler();
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance2 = SchedulerTestUtils.getRandomInstance(2);
            Instance randomInstance3 = SchedulerTestUtils.getRandomInstance(2);
            scheduler.newInstanceAvailable(randomInstance);
            scheduler.newInstanceAvailable(randomInstance2);
            scheduler.newInstanceAvailable(randomInstance3);
            AllocatedSlot scheduleImmediately = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Collections.emptyList())));
            Instance allocatedSlot = scheduleImmediately.getInstance();
            Instance instance = allocatedSlot != randomInstance ? randomInstance : randomInstance2;
            Instance instance2 = allocatedSlot == randomInstance3 ? randomInstance2 : randomInstance3;
            AllocatedSlot scheduleImmediately2 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Collections.singletonList(scheduleImmediately.getInstance()))));
            Assert.assertEquals(allocatedSlot, scheduleImmediately2.getInstance());
            Assert.assertEquals(instance, scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Arrays.asList(allocatedSlot, instance)))).getInstance());
            AllocatedSlot scheduleImmediately3 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Arrays.asList(allocatedSlot, instance2))));
            AllocatedSlot scheduleImmediately4 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Arrays.asList(allocatedSlot, instance2))));
            Assert.assertEquals(instance2, scheduleImmediately3.getInstance());
            Assert.assertEquals(instance2, scheduleImmediately4.getInstance());
            AllocatedSlot scheduleImmediately5 = scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Arrays.asList(allocatedSlot, instance2))));
            Assert.assertEquals(instance, scheduleImmediately5.getInstance());
            scheduleImmediately2.releaseSlot();
            scheduleImmediately5.releaseSlot();
            Assert.assertEquals(allocatedSlot, scheduler.scheduleImmediately(new ScheduledUnit(SchedulerTestUtils.getTestVertex(Arrays.asList(allocatedSlot, instance2)))).getInstance());
            Assert.assertEquals(1L, scheduler.getNumberOfUnconstrainedAssignments());
            Assert.assertEquals(1L, scheduler.getNumberOfNonLocalizedAssignments());
            Assert.assertEquals(5L, scheduler.getNumberOfLocalizedAssignments());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
