/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeTaskManagerSlot;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.LeastUtilizationSlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotMatchingStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DeclarativeSlotManagerTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
    private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = new WorkerResourceSpec.Builder().setCpuCores(100.0).setTaskHeapMemoryMB(10000).setTaskOffHeapMemoryMB(10000).setNetworkMemoryMB(10000).setManagedMemoryMB(10000).build();

    DeclarativeSlotManagerTest() {
    }

    @Test
    void testCloseAfterSuspendDoesNotThrowException() throws Exception {
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStartWithDirectExec();){
            slotManager.suspend();
        }
    }

    @Test
    void testTaskManagerRegistration() throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            ((AbstractIntegerAssert)Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).as("The number registered slots does not equal the expected number.", new Object[0])).isEqualTo(2);
            Assertions.assertThat((Object)slotTracker.getSlot(slotId1)).isNotNull();
            Assertions.assertThat((Object)slotTracker.getSlot(slotId2)).isNotNull();
        }
    }

    @Test
    void testTaskManagerUnregistration() throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> new CompletableFuture()).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = taskManagerConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            ((AbstractIntegerAssert)Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).as("The number registered slots does not equal the expected number.", new Object[0])).isEqualTo(2);
            slotManager.processResourceRequirements(resourceRequirements);
            slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID(), (Exception)TEST_EXCEPTION);
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(0);
        }
    }

    @Test
    void testRequirementDeclarationWithoutFreeSlotsTriggersWorkerAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        CompletableFuture allocateResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(allocateResourceFuture::complete).build();
        try (DeclarativeSlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            slotManager.processResourceRequirements(resourceRequirements);
            allocateResourceFuture.get();
        }
    }

    @Test
    void testRequirementDeclarationWithBlockedSlotsTriggersWorkerAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        CompletableFuture allocateResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(allocateResourceFuture::complete).build();
        ResourceID blockedTaskManager = ResourceID.generate();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(resourceManagerId, Executors.directExecutor(), resourceManagerActions, arg_0 -> ((ResourceID)blockedTaskManager).equals(arg_0));){
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(blockedTaskManager, (TaskExecutorGateway)taskExecutorGateway);
            SlotID slotId = new SlotID(blockedTaskManager, 0);
            SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements);
            allocateResourceFuture.get();
        }
    }

    @Test
    void testRequirementDeclarationWithResourceAllocationFailure() throws Exception {
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(value -> false).build();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStartWithDirectExec(ResourceManagerId.generate(), resourceManagerActions);){
            slotManager.processResourceRequirements(resourceRequirements);
            JobID jobId = resourceRequirements.getJobId();
            Assertions.assertThat((int)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId))).isEqualTo(1);
        }
    }

    @Test
    void testRequirementDeclarationWithFreeSlot() throws Exception {
        this.testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION);
    }

    @Test
    void testRequirementDeclarationWithPendingSlot() throws Exception {
        this.testRequirementDeclaration(RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION);
    }

    private void testRequirementDeclaration(RequirementDeclarationScenario scenario) throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        ResourceProfile resourceProfile = ResourceProfile.fromResources((double)42.0, (int)1337);
        CompletableFuture requestFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestFuture.complete(Tuple6.of((Object)tuple6.f0, (Object)tuple6.f1, (Object)tuple6.f2, (Object)tuple6.f3, (Object)tuple6.f4, (Object)tuple6.f5));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec(resourceManagerId, new TestingResourceActionsBuilder().build());){
            if (scenario == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION) {
                slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            }
            ResourceRequirements requirements = ResourceRequirements.create((JobID)jobId, (String)"localhost", Collections.singleton(ResourceRequirement.create((ResourceProfile)resourceProfile, (int)1)));
            slotManager.processResourceRequirements(requirements);
            if (scenario == RequirementDeclarationScenario.TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION) {
                slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            }
            Assertions.assertThat(requestFuture.get()).isEqualTo((Object)Tuple6.of((Object)slotId, (Object)jobId, (Object)((Tuple6)requestFuture.get()).f2, (Object)resourceProfile, (Object)"localhost", (Object)resourceManagerId));
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)slot.getJobId()).as("The slot has not been allocated to the expected allocation id.", new Object[0])).isEqualTo((Object)jobId);
        }
    }

    @Test
    void testFreeSlot() throws Exception {
        TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection();
        ResourceID resourceID = taskExecutorConnection.getResourceID();
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assertions.assertThat((Comparable)slot.getState()).isSameAs((Object)SlotState.ALLOCATED);
            slotManager.freeSlot(slotId, new AllocationID());
            Assertions.assertThat((Comparable)slot.getState()).isSameAs((Object)SlotState.FREE);
            Assertions.assertThat((int)slotManager.getNumberFreeSlots()).isEqualTo(1);
        }
    }

    @Test
    void testDuplicateResourceRequirementDeclarationAfterSuccessfulAllocation() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        AtomicInteger allocateResourceCalls = new AtomicInteger(0);
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceConsumer(ignored -> allocateResourceCalls.incrementAndGet()).build();
        ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        ResourceID resourceID = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(requirements);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.ALLOCATED);
            slotManager.processResourceRequirements(requirements);
        }
        Assertions.assertThat((int)allocateResourceCalls.get()).isEqualTo(0);
    }

    @Test
    void testSlotCanBeAllocatedForDifferentJobAfterFree() throws Exception {
        this.testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.BEFORE_FREE);
        this.testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime.AFTER_FREE);
    }

    private void testSlotCanBeAllocatedForDifferentJobAfterFree(SecondRequirementDeclarationTime secondRequirementDeclarationTime) throws Exception {
        AllocationID allocationId = new AllocationID();
        ResourceRequirements resourceRequirements1 = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        ResourceRequirements resourceRequirements2 = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection();
        ResourceID resourceID = taskManagerConnection.getResourceID();
        SlotID slotId = new SlotID(resourceID, 0);
        SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements1);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(slotId);
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)resourceRequirements1.getJobId()).as("The slot has not been allocated to the expected job id.", new Object[0])).isEqualTo((Object)slot.getJobId());
            if (secondRequirementDeclarationTime == SecondRequirementDeclarationTime.BEFORE_FREE) {
                slotManager.processResourceRequirements(resourceRequirements2);
            }
            slotManager.processResourceRequirements(ResourceRequirements.create((JobID)resourceRequirements1.getJobId(), (String)resourceRequirements1.getTargetAddress(), Collections.emptyList()));
            slotManager.freeSlot(slotId, allocationId);
            if (secondRequirementDeclarationTime == SecondRequirementDeclarationTime.AFTER_FREE) {
                slotManager.processResourceRequirements(resourceRequirements2);
            }
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)resourceRequirements2.getJobId()).as("The slot has not been allocated to the expected job id.", new Object[0])).isEqualTo((Object)slot.getJobId());
        }
    }

    @Test
    void testReceivingUnknownSlotReport() throws Exception {
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
        InstanceID unknownInstanceID = new InstanceID();
        SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
        SlotReport unknownSlotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(unknownSlotId));
        try (DeclarativeSlotManager slotManager = this.createSlotManager(resourceManagerId, resourceManagerActions);){
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(0);
            Assertions.assertThat((boolean)slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport)).isFalse();
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(0);
        }
    }

    @Test
    void testUpdateSlotReport() throws Exception {
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection();
        ResourceID resourceId = taskManagerConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1);
        SlotStatus slotStatus2 = DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2);
        SlotStatus newSlotStatus2 = DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId2);
        JobID jobId = newSlotStatus2.getJobID();
        SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(0);
            slotManager.registerTaskManager(taskManagerConnection, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY);
            DeclarativeTaskManagerSlot slot1 = slotTracker.getSlot(slotId1);
            DeclarativeTaskManagerSlot slot2 = slotTracker.getSlot(slotId2);
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(2);
            Assertions.assertThat((Comparable)slot1.getState()).isSameAs((Object)SlotState.FREE);
            Assertions.assertThat((Comparable)slot2.getState()).isSameAs((Object)SlotState.FREE);
            Assertions.assertThat((boolean)slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2)).isTrue();
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(2);
            Assertions.assertThat((Object)slotTracker.getSlot(slotId1)).isNotNull();
            Assertions.assertThat((Object)slotTracker.getSlot(slotId2)).isNotNull();
            Assertions.assertThat((Comparable)slot1.getState()).isSameAs((Object)SlotState.FREE);
            Assertions.assertThat((Comparable)jobId).isEqualTo((Object)slotTracker.getSlot(slotId2).getJobId());
        }
    }

    @Test
    void testSlotAllocationTimeout() throws Exception {
        CompletableFuture secondSlotRequestFuture = new CompletableFuture();
        ArrayBlockingQueue<Supplier<CompletableFuture>> responseQueue = new ArrayBlockingQueue<Supplier<CompletableFuture>>(2);
        responseQueue.add(() -> FutureUtils.completedExceptionally((Throwable)new TimeoutException("timeout")));
        responseQueue.add(() -> {
            secondSlotRequestFuture.complete(null);
            return new CompletableFuture();
        });
        TaskExecutorConnection taskManagerConnection = this.createTaskExecutorConnection(new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> (CompletableFuture)((Supplier)responseQueue.remove()).get()).createTestingTaskExecutorGateway());
        SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskManagerConnection.getResourceID(), 2);
        ExecutorService mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(ResourceManagerId.generate(), mainThreadExecutor, new TestingResourceActionsBuilder().build());){
            ((CompletableFuture)CompletableFuture.runAsync(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY), mainThreadExecutor).thenRun(() -> slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot()))).get(5L, TimeUnit.SECONDS);
            secondSlotRequestFuture.get();
        }
    }

    @Test
    void testTaskExecutorSlotAllocationTimeoutHandling() throws Exception {
        JobID jobId = new JobID();
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobId);
        CompletableFuture slotRequestFuture1 = new CompletableFuture();
        CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<Acknowledge>();
        Iterator<CompletableFuture> slotRequestFutureIterator = Arrays.asList(slotRequestFuture1, slotRequestFuture2).iterator();
        ArrayBlockingQueue slotIds = new ArrayBlockingQueue(2);
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(FunctionUtils.uncheckedFunction(requestSlotParameters -> {
            slotIds.put(requestSlotParameters.f0);
            return (CompletableFuture)slotRequestFutureIterator.next();
        })).createTestingTaskExecutorGateway();
        ResourceID resourceId = ResourceID.generate();
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, (TaskExecutorGateway)taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            slotManager.registerTaskManager(taskManagerConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements);
            SlotID firstSlotId = (SlotID)slotIds.take();
            Assertions.assertThat(slotIds).isEmpty();
            DeclarativeTaskManagerSlot failedSlot = slotTracker.getSlot(firstSlotId);
            slotRequestFuture1.completeExceptionally((Throwable)new SlotAllocationException("Test exception."));
            Assertions.assertThat((int)DeclarativeSlotManagerTest.getTotalResourceCount(resourceTracker.getAcquiredResources(jobId))).isEqualTo(1);
            slotRequestFuture2.complete(Acknowledge.get());
            SlotID secondSlotId = (SlotID)slotIds.take();
            Assertions.assertThat(slotIds).isEmpty();
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot(secondSlotId);
            Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.ALLOCATED);
            Assertions.assertThat((Comparable)jobId).isEqualTo((Object)slot.getJobId());
            if (!failedSlot.getSlotId().equals((Object)slot.getSlotId())) {
                Assertions.assertThat((Comparable)failedSlot.getState()).isEqualTo((Object)SlotState.FREE);
            }
        }
    }

    @Test
    void testSlotReportWithConflictingJobIdDuringSlotAllocation() throws Exception {
        ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
        ArrayBlockingQueue requestedSlotIds = new ArrayBlockingQueue(2);
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(FunctionUtils.uncheckedFunction(requestSlotParameters -> {
            requestedSlotIds.put(requestSlotParameters.f0);
            return new CompletableFuture();
        })).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = taskExecutorConnection.getResourceID();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotReport slotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId1), DeclarativeSlotManagerTest.createFreeSlotStatus(slotId2)));
        ManuallyTriggeredScheduledExecutor mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder((ScheduledExecutor)mainThreadExecutor).buildAndStart(ResourceManagerId.generate(), (Executor)mainThreadExecutor, new TestingResourceActionsBuilder().build());){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(resourceRequirements);
            SlotID firstRequestedSlotId = (SlotID)requestedSlotIds.take();
            SlotID freeSlotId = firstRequestedSlotId.equals((Object)slotId1) ? slotId2 : slotId1;
            SlotReport newSlotReport = new SlotReport(Arrays.asList(DeclarativeSlotManagerTest.createAllocatedSlotStatus(firstRequestedSlotId), DeclarativeSlotManagerTest.createFreeSlotStatus(freeSlotId)));
            slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), newSlotReport);
            SlotID secondRequestedSlotId = (SlotID)requestedSlotIds.take();
            Assertions.assertThat((Object)freeSlotId).isEqualTo((Object)secondRequestedSlotId);
        }
    }

    @Test
    void testReportAllocatedSlot() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, (TaskExecutorGateway)taskExecutorGateway);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            SlotID slotId = new SlotID(taskManagerId, 0);
            SlotReport initialSlotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(slotId));
            slotManager.registerTaskManager(taskExecutorConnection, initialSlotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(1);
            SlotStatus slotStatus = DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId);
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
            JobID jobId = new JobID();
            ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobId);
            slotManager.processResourceRequirements(requirements);
            Assertions.assertThat((Comparable)slotTracker.getSlot(slotId).getJobId()).isEqualTo((Object)slotStatus.getJobID());
            Assertions.assertThat((int)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId))).isEqualTo(1);
        }
    }

    @Test
    void testSlotRequestFailure() throws Exception {
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            ResourceRequirements requirements = DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot();
            slotManager.processResourceRequirements(requirements);
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(2);
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<Acknowledge>();
            responseQueue.offer(secondManualSlotRequestResponse);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0)));
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Tuple6 firstRequest = (Tuple6)requestSlotQueue.take();
            firstManualSlotRequestResponse.completeExceptionally((Throwable)new SlotAllocationException("Test exception"));
            Tuple6 secondRequest = (Tuple6)requestSlotQueue.take();
            Assertions.assertThat((Comparable)((Comparable)secondRequest.f1)).isEqualTo(firstRequest.f1);
            Assertions.assertThat((Object)secondRequest.f0).isEqualTo(firstRequest.f0);
            secondManualSlotRequestResponse.complete(Acknowledge.get());
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot((SlotID)secondRequest.f0);
            Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.ALLOCATED);
            Assertions.assertThat((Comparable)slot.getJobId()).isEqualTo(secondRequest.f1);
        }
    }

    @Test
    void testSlotRequestRemovedIfTMReportsAllocation() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).setSlotTracker((SlotTracker)slotTracker).buildAndStartWithDirectExec();){
            JobID jobID = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(jobID));
            ArrayBlockingQueue requestSlotQueue = new ArrayBlockingQueue(1);
            ArrayBlockingQueue responseQueue = new ArrayBlockingQueue(2);
            CompletableFuture firstManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(firstManualSlotRequestResponse);
            CompletableFuture secondManualSlotRequestResponse = new CompletableFuture();
            responseQueue.offer(secondManualSlotRequestResponse);
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
                requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple6);
                try {
                    return (CompletableFuture)responseQueue.take();
                }
                catch (InterruptedException ignored) {
                    return FutureUtils.completedExceptionally((Throwable)new FlinkException("Response queue was interrupted."));
                }
            }).createTestingTaskExecutorGateway();
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)testingTaskExecutorGateway);
            SlotReport slotReport = new SlotReport(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, 0)));
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Tuple6 firstRequest = (Tuple6)requestSlotQueue.take();
            firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
            Tuple6 secondRequest = (Tuple6)requestSlotQueue.take();
            secondManualSlotRequestResponse.completeExceptionally((Throwable)new SlotOccupiedException("Test exception", new AllocationID(), jobID));
            Assertions.assertThat((Comparable)((Comparable)firstRequest.f1)).isEqualTo((Object)jobID);
            Assertions.assertThat((Comparable)((Comparable)secondRequest.f1)).isEqualTo((Object)jobID);
            Assertions.assertThat((Object)secondRequest.f0).isEqualTo(firstRequest.f0);
            DeclarativeTaskManagerSlot slot = slotTracker.getSlot((SlotID)secondRequest.f0);
            Assertions.assertThat((Comparable)slot.getState()).isEqualTo((Object)SlotState.ALLOCATED);
            Assertions.assertThat((Comparable)slot.getJobId()).isEqualTo(firstRequest.f1);
            Assertions.assertThat((int)slotManager.getNumberRegisteredSlots()).isEqualTo(1);
            Assertions.assertThat((int)DeclarativeSlotManagerTest.getTotalResourceCount(resourceTracker.getAcquiredResources(jobID))).isEqualTo(1);
        }
    }

    @Test
    void testTaskExecutorFailedHandling() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStartWithDirectExec();){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            TaskExecutorConnection taskExecutionConnection1 = this.createTaskExecutorConnection();
            SlotReport slotReport1 = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection1.getResourceID(), 2);
            slotManager.registerTaskManager(taskExecutionConnection1, slotReport1, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.unregisterTaskManager(taskExecutionConnection1.getInstanceID(), (Exception)TEST_EXCEPTION);
            Assertions.assertThat((int)DeclarativeSlotManagerTest.getTotalResourceCount((Collection)resourceTracker.getMissingResources().get(jobId))).isEqualTo(2);
        }
    }

    @Test
    void testRequestNewResources() throws Exception {
        int numberSlots = 2;
        AtomicInteger resourceRequests = new AtomicInteger(0);
        TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> {
            resourceRequests.incrementAndGet();
            return true;
        }).build();
        try (DeclarativeSlotManager slotManager = this.createSlotManager(ResourceManagerId.generate(), testingResourceActions, 2);){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            Assertions.assertThat((int)resourceRequests.get()).isEqualTo(1);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            Assertions.assertThat((int)resourceRequests.get()).isEqualTo(1);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 3));
            Assertions.assertThat((int)resourceRequests.get()).isEqualTo(2);
        }
    }

    private TaskExecutorConnection createTaskExecutorConnection() {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        return this.createTaskExecutorConnection(taskExecutorGateway);
    }

    private TaskExecutorConnection createTaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
        return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
    }

    @Test
    void testSpreadOutSlotAllocationStrategy() throws Exception {
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotMatchingStrategy((SlotMatchingStrategy)LeastUtilizationSlotMatchingStrategy.INSTANCE).buildAndStartWithDirectExec();){
            ArrayList<CompletableFuture<JobID>> requestSlotFutures = new ArrayList<CompletableFuture<JobID>>();
            int numberTaskExecutors = 5;
            for (int i = 0; i < 5; ++i) {
                CompletableFuture<JobID> requestSlotFuture = new CompletableFuture<JobID>();
                requestSlotFutures.add(requestSlotFuture);
                this.registerTaskExecutorWithTwoSlots(slotManager, requestSlotFuture);
            }
            JobID jobId = new JobID();
            ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirements(jobId, 5);
            slotManager.processResourceRequirements(resourceRequirements);
            HashSet jobIds = new HashSet((Collection)FutureUtils.combineAll(requestSlotFutures).get(10L, TimeUnit.SECONDS));
            Assertions.assertThat(jobIds).hasSize(1);
            Assertions.assertThat(jobIds).containsExactlyInAnyOrder((Object[])new JobID[]{jobId});
        }
    }

    private void registerTaskExecutorWithTwoSlots(DeclarativeSlotManager slotManager, CompletableFuture<JobID> firstRequestSlotFuture) {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple6 -> {
            firstRequestSlotFuture.complete((JobID)slotIDJobIDAllocationIDStringResourceManagerIdTuple6.f1);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection firstTaskExecutorConnection = this.createTaskExecutorConnection(taskExecutorGateway);
        SlotReport firstSlotReport = DeclarativeSlotManagerTest.createSlotReport(firstTaskExecutorConnection.getResourceID(), 2);
        slotManager.registerTaskManager(firstTaskExecutorConnection, firstSlotReport, ResourceProfile.ANY, ResourceProfile.ANY);
    }

    @Test
    void testNotificationAboutNotEnoughResources() throws Exception {
        DeclarativeSlotManagerTest.testNotificationAboutNotEnoughResources(false);
    }

    @Test
    void testGracePeriodForNotificationAboutNotEnoughResources() throws Exception {
        DeclarativeSlotManagerTest.testNotificationAboutNotEnoughResources(true);
    }

    private static void testNotificationAboutNotEnoughResources(boolean withNotificationGracePeriod) throws Exception {
        JobID jobId = new JobID();
        int numRequiredSlots = 3;
        boolean numExistingSlots = true;
        ArrayList notEnoughResourceNotifications = new ArrayList();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> false).setNotEnoughResourcesConsumer((jobId1, acquiredResources) -> notEnoughResourceNotifications.add(Tuple2.of((Object)jobId1, (Object)acquiredResources))).build();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(ResourceManagerId.generate(), (Executor)new ManuallyTriggeredScheduledExecutor(), resourceManagerActions);){
            if (withNotificationGracePeriod) {
                slotManager.setFailUnfulfillableRequest(false);
            }
            ResourceID taskExecutorResourceId = ResourceID.generate();
            TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutorResourceId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            ResourceRequirements resourceRequirements = DeclarativeSlotManagerTest.createResourceRequirements(jobId, 3);
            slotManager.processResourceRequirements(resourceRequirements);
            if (withNotificationGracePeriod) {
                Assertions.assertThat(notEnoughResourceNotifications).isEmpty();
                slotManager.setFailUnfulfillableRequest(true);
            }
            Assertions.assertThat(notEnoughResourceNotifications).hasSize(1);
            Tuple2 notification = (Tuple2)notEnoughResourceNotifications.get(0);
            Assertions.assertThat((Comparable)((Comparable)notification.f0)).isEqualTo((Object)jobId);
            Assertions.assertThat((Collection)((Collection)notification.f1)).contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.ANY, (int)1)});
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), slotReport);
            Assertions.assertThat(notEnoughResourceNotifications).hasSize(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAllocationUpdatesIgnoredIfTaskExecutorUnregistered() throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())).createTestingTaskExecutorGateway();
        SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(trackingSecurityManager);
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)executor, new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.unregisterTaskManager(taskExecutionConnection.getInstanceID(), (Exception)TEST_EXCEPTION);
            executor.triggerAll();
            Assertions.assertThat(trackingSecurityManager.getSystemExitFuture()).isNotDone();
        }
        finally {
            System.setSecurityManager(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAllocationUpdatesIgnoredIfSlotMarkedAsAllocatedAfterSlotReport() throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> CompletableFuture.completedFuture(Acknowledge.get())).createTestingTaskExecutorGateway();
        SystemExitTrackingSecurityManager trackingSecurityManager = new SystemExitTrackingSecurityManager();
        System.setSecurityManager(trackingSecurityManager);
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)executor, new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1));
            executor.triggerAll();
            Assertions.assertThat(trackingSecurityManager.getSystemExitFuture()).isNotDone();
        }
        finally {
            System.setSecurityManager(null);
        }
    }

    @Test
    void testAllocationUpdatesIgnoredIfSlotMarkedAsPendingForOtherJob() throws Exception {
        DefaultSlotTracker slotTracker = new DefaultSlotTracker();
        CompletableFuture firstSlotAllocationIdFuture = new CompletableFuture();
        CompletableFuture<Acknowledge> firstSlotRequestAcknowledgeFuture = new CompletableFuture<Acknowledge>();
        Iterator<CompletableFuture> slotRequestAcknowledgeFutures = Arrays.asList(firstSlotRequestAcknowledgeFuture, new CompletableFuture()).iterator();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(requestSlotParameters -> {
            firstSlotAllocationIdFuture.complete(requestSlotParameters.f2);
            return (CompletableFuture)slotRequestAcknowledgeFutures.next();
        }).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotTracker((SlotTracker)slotTracker).buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1);
            SlotID slotId = ((SlotStatus)Iterators.getOnlyElement((Iterator)slotReport.iterator())).getSlotID();
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.reportSlotStatus(taskExecutionConnection.getInstanceID(), DeclarativeSlotManagerTest.createSlotReport(taskExecutionConnection.getResourceID(), 1));
            JobID firstJobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(firstJobId, 1));
            slotManager.processResourceRequirements(ResourceRequirements.empty((JobID)firstJobId, (String)"foobar"));
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(new JobID(), 1));
            slotManager.freeSlot(slotId, (AllocationID)firstSlotAllocationIdFuture.get());
            firstSlotRequestAcknowledgeFuture.complete(Acknowledge.get());
            Assertions.assertThat((Comparable)slotTracker.getSlot(slotId).getJobId()).isNotEqualTo((Object)firstJobId);
        }
    }

    @Test
    void testReclaimInactiveSlotsOnClearRequirements() throws Exception {
        CompletableFuture freeInactiveSlotsJobIdFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeInactiveSlotsConsumer(freeInactiveSlotsJobIdFuture::complete).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            Assertions.assertThat(freeInactiveSlotsJobIdFuture).isNotDone();
            slotManager.processResourceRequirements(ResourceRequirements.empty((JobID)jobId, (String)"foobar"));
            Assertions.assertThat(freeInactiveSlotsJobIdFuture).isNotDone();
            slotManager.clearResourceRequirements(jobId);
            Assertions.assertThat((Comparable)((Comparable)freeInactiveSlotsJobIdFuture.get())).isEqualTo((Object)jobId);
        }
    }

    @Test
    void testProcessResourceRequirementsWithDelay() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        AtomicInteger allocatedResourceCounter = new AtomicInteger(0);
        ManuallyTriggeredScheduledExecutor scheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        Duration delay = Duration.ofMillis(500L);
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder((ScheduledExecutor)scheduledExecutor).setResourceTracker((ResourceTracker)resourceTracker).setRequirementCheckDelay(delay).buildAndStartWithDirectExec(ResourceManagerId.generate(), new TestingResourceActionsBuilder().setAllocateResourceConsumer(workerResourceSpec -> allocatedResourceCounter.getAndIncrement()).build());){
            JobID jobId = new JobID();
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            Assertions.assertThat((int)allocatedResourceCounter.get()).isEqualTo(0);
            Assertions.assertThat((Collection)scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
            ScheduledFuture future = (ScheduledFuture)scheduledExecutor.getActiveNonPeriodicScheduledTask().iterator().next();
            Assertions.assertThat((long)future.getDelay(TimeUnit.MILLISECONDS)).isEqualTo(delay.toMillis());
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1));
            Assertions.assertThat((Collection)scheduledExecutor.getActiveNonPeriodicScheduledTask()).hasSize(1);
            scheduledExecutor.triggerNonPeriodicScheduledTask();
            Assertions.assertThat((int)allocatedResourceCounter.get()).isEqualTo(1);
        }
    }

    @Test
    void testClearRequirementsClearsResourceTracker() throws Exception {
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        CompletableFuture freeInactiveSlotsJobIdFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeInactiveSlotsConsumer(freeInactiveSlotsJobIdFuture::complete).createTestingTaskExecutorGateway();
        try (DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setResourceTracker((ResourceTracker)resourceTracker).buildAndStart(ResourceManagerId.generate(), (Executor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), new TestingResourceActionsBuilder().build());){
            JobID jobId = new JobID();
            TaskExecutorConnection taskExecutionConnection = this.createTaskExecutorConnection(taskExecutorGateway);
            SlotReport slotReport = DeclarativeSlotManagerTest.createSlotReportWithAllocatedSlots(taskExecutionConnection.getResourceID(), jobId, 1);
            slotManager.registerTaskManager(taskExecutionConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            slotManager.processResourceRequirements(DeclarativeSlotManagerTest.createResourceRequirements(jobId, 2));
            slotManager.clearResourceRequirements(jobId);
            Assertions.assertThat(resourceTracker.getMissingResources().keySet()).isEmpty();
        }
    }

    @Test
    void testMetricsUnregisteredWhenSuspending() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)SlotManager::suspend));
    }

    @Test
    void testMetricsUnregisteredWhenClosing() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)AutoCloseable::close));
    }

    private void testAccessMetricValueDuringItsUnregister(ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception {
        AtomicInteger registeredMetrics = new AtomicInteger();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((a, b, c) -> registeredMetrics.incrementAndGet()).setUnregisterConsumer((a, b, c) -> registeredMetrics.decrementAndGet()).build();
        DeclarativeSlotManager slotManager = DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder().setSlotManagerMetricGroup(SlotManagerMetricGroup.create((MetricRegistry)metricRegistry, (String)"localhost")).buildAndStartWithDirectExec();
        Assertions.assertThat((int)registeredMetrics.get()).isGreaterThan(0);
        closeFn.accept((Object)slotManager);
        Assertions.assertThat((int)registeredMetrics.get()).isEqualTo(0);
    }

    private static SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
        HashSet<SlotStatus> slotStatusSet = new HashSet<SlotStatus>(numberSlots);
        for (int i = 0; i < numberSlots; ++i) {
            slotStatusSet.add(DeclarativeSlotManagerTest.createFreeSlotStatus(new SlotID(taskExecutorResourceId, i)));
        }
        return new SlotReport(slotStatusSet);
    }

    private static SlotReport createSlotReportWithAllocatedSlots(ResourceID taskExecutorResourceId, JobID jobId, int numberSlots) {
        HashSet<SlotStatus> slotStatusSet = new HashSet<SlotStatus>(numberSlots);
        for (int i = 0; i < numberSlots; ++i) {
            slotStatusSet.add(DeclarativeSlotManagerTest.createAllocatedSlotStatus(new SlotID(taskExecutorResourceId, i), jobId));
        }
        return new SlotReport(slotStatusSet);
    }

    private static SlotStatus createFreeSlotStatus(SlotID slotId) {
        return new SlotStatus(slotId, ResourceProfile.ANY);
    }

    private static SlotStatus createAllocatedSlotStatus(SlotID slotId) {
        return DeclarativeSlotManagerTest.createAllocatedSlotStatus(slotId, JobID.generate());
    }

    private static SlotStatus createAllocatedSlotStatus(SlotID slotId, JobID jobId) {
        return new SlotStatus(slotId, ResourceProfile.ANY, jobId, new AllocationID());
    }

    private DeclarativeSlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
        return this.createSlotManager(resourceManagerId, resourceManagerActions, 1);
    }

    private DeclarativeSlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions, int numSlotsPerWorker) {
        return DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder((ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).setNumSlotsPerWorker(numSlotsPerWorker).setRedundantTaskManagerNum(0).buildAndStartWithDirectExec(resourceManagerId, resourceManagerActions);
    }

    private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder() {
        return DeclarativeSlotManagerTest.createDeclarativeSlotManagerBuilder((ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()));
    }

    private static DeclarativeSlotManagerBuilder createDeclarativeSlotManagerBuilder(ScheduledExecutor executor) {
        return DeclarativeSlotManagerBuilder.newBuilder(executor).setDefaultWorkerResourceSpec(WORKER_RESOURCE_SPEC);
    }

    private static ResourceRequirements createResourceRequirementsForSingleSlot() {
        return DeclarativeSlotManagerTest.createResourceRequirementsForSingleSlot(new JobID());
    }

    private static ResourceRequirements createResourceRequirementsForSingleSlot(JobID jobId) {
        return DeclarativeSlotManagerTest.createResourceRequirements(jobId, 1);
    }

    private static ResourceRequirements createResourceRequirements(JobID jobId, int numRequiredSlots) {
        return ResourceRequirements.create((JobID)jobId, (String)"foobar", Collections.singleton(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)numRequiredSlots)));
    }

    private static int getTotalResourceCount(Collection<ResourceRequirement> resources) {
        if (resources == null) {
            return 0;
        }
        return resources.stream().map(ResourceRequirement::getNumberOfRequiredSlots).reduce(0, Integer::sum);
    }

    private static enum SecondRequirementDeclarationTime {
        BEFORE_FREE,
        AFTER_FREE;

    }

    private static enum RequirementDeclarationScenario {
        TASK_EXECUTOR_REGISTRATION_BEFORE_REQUIREMENT_DECLARATION,
        TASK_EXECUTOR_REGISTRATION_AFTER_REQUIREMENT_DECLARATION;

    }
}

