/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SlotManagerTest
extends TestLogger {
    @Test
    public void testTaskManagerRegistration() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
        }
    }

    @Test
    public void testTaskManagerUnregistration() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        JobID jobId = new JobID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)new FlinkCompletableFuture());
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId2, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((String)"The number registered slots does not equal the expected number.", (2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((boolean)slot1.isAllocated());
            Assert.assertTrue((boolean)slot2.isFree());
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
            Assert.assertFalse((boolean)slot2.isFree());
            Assert.assertTrue((boolean)slot2.hasPendingSlotRequest());
            PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2);
            Assert.assertTrue((String)"The pending slot request should have been assigned to slot 2", (boolean)pendingSlotRequest.isAssigned());
            slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID());
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)pendingSlotRequest.isAssigned());
        }
    }

    @Test
    public void testSlotRequestWithoutFreeSlots() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions)).allocateResource((ResourceProfile)Matchers.eq((Object)resourceProfile));
        }
    }

    @Test
    public void testSlotRequestWithResourceAllocationFailure() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), resourceProfile, "localhost");
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        ((ResourceManagerActions)Mockito.doThrow((Throwable)new ResourceManagerException("Test exception")).when((Object)resourceManagerActions)).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerSlotRequest(slotRequest);
            Assert.fail((String)"The slot request should have failed with a ResourceManagerException.");
        }
        catch (ResourceManagerException resourceManagerException) {
            // empty catch block
        }
    }

    @Test
    public void testSlotRequestWithFreeSlot() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
            Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)Acknowledge.get()));
            TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
            SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
            SlotReport slotReport = new SlotReport(slotStatus);
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway)).requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (String)Matchers.eq((Object)"localhost"), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testUnregisterPendingSlotRequest() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        AllocationID allocationId = new AllocationID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)new FlinkCompletableFuture());
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            slotManager.registerSlotRequest(slotRequest);
            Assert.assertNotNull((Object)slotManager.getSlotRequest(allocationId));
            Assert.assertTrue((boolean)slot.hasPendingSlotRequest());
            slotManager.unregisterSlotRequest(allocationId);
            Assert.assertNull((Object)slotManager.getSlotRequest(allocationId));
            slot = slotManager.getSlot(slotId);
            Assert.assertTrue((boolean)slot.isFree());
        }
    }

    @Test
    public void testFulfillingPendingSlotRequest() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        String targetAddress = "localhost";
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "localhost");
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)Acknowledge.get()));
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            Assert.assertTrue((String)"The slot request should be accepted", (boolean)slotManager.registerSlotRequest(slotRequest));
            ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.times((int)1))).allocateResource((ResourceProfile)Matchers.eq((Object)resourceProfile));
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway)).requestSlot((SlotID)Matchers.eq((Object)slotId), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (String)Matchers.eq((Object)"localhost"), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testFreeSlot() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceID resourceID = ResourceID.generate();
        JobID jobId = new JobID();
        SlotID slotId = new SlotID(resourceID, 0);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskExecutorConnection, slotReport);
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, new AllocationID());
            Assert.assertTrue((boolean)slot.isAllocated());
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((boolean)slot.isFree());
            Assert.assertNull((Object)slot.getAllocationId());
        }
    }

    @Test
    public void testDuplicatePendingSlotRequest() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.times((int)1))).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
        SlotReport slotReport = new SlotReport(slotStatus);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest));
        }
    }

    @Test
    public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)Acknowledge.get()));
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            Assert.assertFalse((boolean)slotManager.registerSlotRequest(slotRequest2));
        }
        ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.never())).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
        ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
        SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
        SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.any(AllocationID.class), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)Acknowledge.get()));
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
        SlotReport slotReport = new SlotReport(slotStatus);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest1));
            TaskManagerSlot slot = slotManager.getSlot(slotId);
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
            slotManager.freeSlot(slotId, allocationId);
            Assert.assertTrue((boolean)slot.isFree());
            Assert.assertNull((Object)slot.getAllocationId());
            Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest2));
            Assert.assertEquals((String)"The slot has not been allocated to the expected allocation id.", (Object)allocationId, (Object)slot.getAllocationId());
        }
        ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.never())).allocateResource((ResourceProfile)Matchers.any(ResourceProfile.class));
    }

    @Test
    public void testReceivingUnknownSlotReport() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        InstanceID unknownInstanceID = new InstanceID();
        SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
        ResourceProfile unknownResourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
        SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertFalse((boolean)slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
        }
    }

    @Test
    public void testUpdateSlotReport() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
        SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            Assert.assertTrue((0 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            slotManager.registerTaskManager(taskManagerConnection, slotReport1);
            TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
            TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slot1.isFree());
            Assert.assertTrue((boolean)slot2.isFree());
            Assert.assertTrue((boolean)slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
            Assert.assertTrue((2 == slotManager.getNumberRegisteredSlots() ? 1 : 0) != 0);
            Assert.assertNotNull((Object)slotManager.getSlot(slotId1));
            Assert.assertNotNull((Object)slotManager.getSlot(slotId2));
            Assert.assertEquals((Object)allocationId, (Object)slotManager.getSlot(slotId2).getAllocationId());
        }
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        long tmTimeout = 500L;
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        UUID leaderId = UUID.randomUUID();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId = new SlotID(ResourceID.generate(), 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
        final SlotReport slotReport = new SlotReport(slotStatus);
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.milliseconds((long)500L));){
            slotManager.start(leaderId, (Executor)mainThreadExecutor, resourceManagerActions);
            mainThreadExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    slotManager.registerTaskManager(taskManagerConnection, slotReport);
                }
            });
            ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)50000L).times(1))).releaseResource((InstanceID)Matchers.eq((Object)taskManagerConnection.getInstanceID()));
        }
    }

    @Test
    public void testSlotRequestTimeout() throws Exception {
        long allocationTimeout = 50L;
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        UUID leaderId = UUID.randomUUID();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), Time.milliseconds((long)50L), TestingUtils.infiniteTime());){
            slotManager.start(leaderId, (Executor)mainThreadExecutor, resourceManagerActions);
            final AtomicReference<Object> atomicException = new AtomicReference<Object>(null);
            mainThreadExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Assert.assertTrue((boolean)slotManager.registerSlotRequest(slotRequest));
                    }
                    catch (Exception e) {
                        atomicException.compareAndSet(null, e);
                    }
                }
            });
            ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)5000L).times(1))).notifyAllocationFailure((JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), (Exception)Matchers.any(TimeoutException.class));
            if (atomicException.get() != null) {
                throw (Exception)atomicException.get();
            }
        }
    }

    @Test
    public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        FlinkCompletableFuture slotRequestFuture1 = new FlinkCompletableFuture();
        FlinkCompletableFuture slotRequestFuture2 = new FlinkCompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.any(UUID.class), (Time)Matchers.any(Time.class))).thenReturn((Object)slotRequestFuture1, (Object[])new Future[]{slotRequestFuture2});
        TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        try (SlotManager slotManager = this.createSlotManager(leaderId, resourceManagerActions);){
            slotManager.registerTaskManager(taskManagerConnection, slotReport);
            slotManager.registerSlotRequest(slotRequest);
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            TaskManagerSlot failedSlot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            slotRequestFuture1.completeExceptionally((Throwable)new SlotAllocationException("Test exception."));
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            slotRequestFuture2.complete((Object)Acknowledge.get());
            TaskManagerSlot slot = slotManager.getSlot((SlotID)slotIdCaptor.getValue());
            Assert.assertTrue((boolean)slot.isAllocated());
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            if (!failedSlot.getSlotId().equals((Object)slot.getSlotId())) {
                Assert.assertTrue((boolean)failedSlot.isFree());
            }
        }
    }

    @Test
    public void testSlotReportWhileActiveSlotRequest() throws Exception {
        long verifyTimeout = 1000L;
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
        final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        FlinkCompletableFuture slotRequestFuture1 = new FlinkCompletableFuture();
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.any(JobID.class), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.any(UUID.class), (Time)Matchers.any(Time.class))).thenReturn((Object)slotRequestFuture1, (Object[])new Future[]{FlinkCompletableFuture.completed((Object)Acknowledge.get())});
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        ResourceID resourceId = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());){
            slotManager.start(leaderId, (Executor)mainThreadExecutor, resourceManagerActions);
            Future registrationFuture = FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    slotManager.registerTaskManager(taskManagerConnection, slotReport);
                    return null;
                }
            }, (Executor)mainThreadExecutor).thenAccept((AcceptFunction)new AcceptFunction<Void>(){

                public void accept(Void value) {
                    try {
                        slotManager.registerSlotRequest(slotRequest);
                    }
                    catch (SlotManagerException e) {
                        throw new RuntimeException("Could not register slots.", e);
                    }
                }
            });
            registrationFuture.get();
            ArgumentCaptor slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.times((int)1))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            SlotID requestedSlotId = (SlotID)slotIdCaptor.getValue();
            final SlotID freeSlotId = requestedSlotId.equals((Object)slotId1) ? slotId2 : slotId1;
            Future freeSlotFuture = FlinkFuture.supplyAsync((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return slotManager.getSlot(freeSlotId).isFree();
                }
            }, (Executor)mainThreadExecutor);
            Assert.assertTrue((boolean)((Boolean)freeSlotFuture.get()));
            SlotStatus newSlotStatus1 = new SlotStatus((SlotID)slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
            SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
            final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
            FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
                    return null;
                }
            }, (Executor)mainThreadExecutor);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.timeout((long)1000L).times(2))).requestSlot((SlotID)slotIdCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            final SlotID requestedSlotId2 = (SlotID)slotIdCaptor.getValue();
            Assert.assertEquals((Object)slotId2, (Object)requestedSlotId2);
            Future requestedSlotFuture = FlinkFuture.supplyAsync((Callable)new Callable<TaskManagerSlot>(){

                @Override
                public TaskManagerSlot call() throws Exception {
                    return slotManager.getSlot(requestedSlotId2);
                }
            }, (Executor)mainThreadExecutor);
            TaskManagerSlot slot = (TaskManagerSlot)requestedSlotFuture.get();
            Assert.assertTrue((boolean)slot.isAllocated());
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
        }
    }

    @Test
    public void testTimeoutForUnusedTaskManager() throws Exception {
        long taskManagerTimeout = 50L;
        long verifyTimeout = 500L;
        UUID leaderId = UUID.randomUUID();
        ResourceManagerActions resourceManagerActions = (ResourceManagerActions)Mockito.mock(ResourceManagerActions.class);
        ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
        ResourceID resourceId = ResourceID.generate();
        JobID jobId = new JobID();
        final AllocationID allocationId = new AllocationID();
        ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
        final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
        TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)Mockito.mock(TaskExecutorGateway.class);
        Mockito.when((Object)taskExecutorGateway.requestSlot((SlotID)Matchers.any(SlotID.class), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)Acknowledge.get()));
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
        SlotID slotId1 = new SlotID(resourceId, 0);
        SlotID slotId2 = new SlotID(resourceId, 1);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
        final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (final SlotManager slotManager = new SlotManager(scheduledExecutor, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), Time.of((long)50L, (TimeUnit)TimeUnit.MILLISECONDS));){
            slotManager.start(leaderId, (Executor)mainThreadExecutor, resourceManagerActions);
            FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    slotManager.registerSlotRequest(slotRequest);
                    return null;
                }
            }, (Executor)mainThreadExecutor).thenAccept((AcceptFunction)new AcceptFunction<Void>(){

                public void accept(Void value) {
                    slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
                }
            });
            ArgumentCaptor slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
            ((TaskExecutorGateway)Mockito.verify((Object)taskExecutorGateway, (VerificationMode)Mockito.timeout((long)500L))).requestSlot((SlotID)slotIdArgumentCaptor.capture(), (JobID)Matchers.eq((Object)jobId), (AllocationID)Matchers.eq((Object)allocationId), Matchers.anyString(), (UUID)Matchers.eq((Object)leaderId), (Time)Matchers.any(Time.class));
            Future idleFuture = FlinkFuture.supplyAsync((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
                }
            }, (Executor)mainThreadExecutor);
            Assert.assertFalse((boolean)((Boolean)idleFuture.get()));
            final SlotID slotId = (SlotID)slotIdArgumentCaptor.getValue();
            Future slotFuture = FlinkFuture.supplyAsync((Callable)new Callable<TaskManagerSlot>(){

                @Override
                public TaskManagerSlot call() throws Exception {
                    return slotManager.getSlot(slotId);
                }
            }, (Executor)mainThreadExecutor);
            TaskManagerSlot slot = (TaskManagerSlot)slotFuture.get();
            Assert.assertTrue((boolean)slot.isAllocated());
            Assert.assertEquals((Object)allocationId, (Object)slot.getAllocationId());
            Future idleFuture2 = FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    slotManager.freeSlot(slotId, allocationId);
                    return null;
                }
            }, (Executor)mainThreadExecutor).thenApply((ApplyFunction)new ApplyFunction<Void, Boolean>(){

                public Boolean apply(Void value) {
                    return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
                }
            });
            Assert.assertTrue((boolean)((Boolean)idleFuture2.get()));
            ((ResourceManagerActions)Mockito.verify((Object)resourceManagerActions, (VerificationMode)Mockito.timeout((long)500L).times(1))).releaseResource((InstanceID)Matchers.eq((Object)taskManagerConnection.getInstanceID()));
        }
    }

    private SlotManager createSlotManager(UUID leaderId, ResourceManagerActions resourceManagerActions) {
        SlotManager slotManager = new SlotManager(TestingUtils.defaultScheduledExecutor(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
        return slotManager;
    }
}

