/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.instance;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.AvailableSlotsTest;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class SlotPoolTest
extends TestLogger {
    private RpcService rpcService;
    private JobID jobId;
    private MainThreadValidatorUtil mainThreadValidatorUtil;
    private SlotPool slotPool;
    private ResourceManagerGateway resourceManagerGateway;

    @Before
    public void setUp() throws Exception {
        this.rpcService = new TestingSerialRpcService();
        this.jobId = new JobID();
        this.slotPool = new SlotPool(this.rpcService, this.jobId);
        this.mainThreadValidatorUtil = new MainThreadValidatorUtil((RpcEndpoint)this.slotPool);
        this.mainThreadValidatorUtil.enterMainThread();
        String jobManagerAddress = "foobar";
        this.slotPool.start(UUID.randomUUID(), "foobar");
        this.resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)this.resourceManagerGateway.requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)Matchers.any(SlotRequest.class), (Time)Matchers.any(Time.class))).thenReturn(Mockito.mock(Future.class, (Answer)Mockito.RETURNS_MOCKS));
        this.slotPool.connectToResourceManager(UUID.randomUUID(), this.resourceManagerGateway);
    }

    @After
    public void tearDown() throws Exception {
        this.mainThreadValidatorUtil.exitMainThread();
    }

    @Test
    public void testAllocateSimpleSlot() throws Exception {
        ResourceID resourceID = new ResourceID("resource");
        this.slotPool.registerTaskManager(resourceID);
        ScheduledUnit task = (ScheduledUnit)Mockito.mock(ScheduledUnit.class);
        Future future = this.slotPool.allocateSlot(task, AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        Assert.assertFalse((boolean)future.isDone());
        ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
        ((ResourceManagerGateway)Mockito.verify((Object)this.resourceManagerGateway)).requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
        SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
        AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        SimpleSlot slot = (SimpleSlot)future.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)slot.isAlive());
        Assert.assertEquals((Object)resourceID, (Object)slot.getTaskManagerID());
        Assert.assertEquals((Object)this.jobId, (Object)slot.getJobID());
        Assert.assertEquals((Object)this.slotPool.getSlotOwner(), (Object)slot.getOwner());
        Assert.assertEquals((Object)this.slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), (Object)slot);
    }

    @Test
    public void testAllocationFulfilledByReturnedSlot() throws Exception {
        ResourceID resourceID = new ResourceID("resource");
        this.slotPool.registerTaskManager(resourceID);
        Future future1 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        Future future2 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        Assert.assertFalse((boolean)future1.isDone());
        Assert.assertFalse((boolean)future2.isDone());
        ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
        ((ResourceManagerGateway)Mockito.verify((Object)this.resourceManagerGateway, (VerificationMode)Mockito.times((int)2))).requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
        List slotRequests = slotRequestArgumentCaptor.getAllValues();
        AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, ((SlotRequest)slotRequests.get(0)).getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertFalse((boolean)future2.isDone());
        slot1.releaseSlot();
        SimpleSlot slot2 = (SimpleSlot)future2.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future2.isDone());
        Assert.assertNotEquals((Object)slot1, (Object)slot2);
        Assert.assertTrue((boolean)slot1.isReleased());
        Assert.assertTrue((boolean)slot2.isAlive());
        Assert.assertEquals((Object)slot1.getTaskManagerID(), (Object)slot2.getTaskManagerID());
        Assert.assertEquals((long)slot1.getSlotNumber(), (long)slot2.getSlotNumber());
        Assert.assertEquals((Object)this.slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), (Object)slot2);
    }

    @Test
    public void testAllocateWithFreeSlot() throws Exception {
        ResourceID resourceID = new ResourceID("resource");
        this.slotPool.registerTaskManager(resourceID);
        Future future1 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        Assert.assertFalse((boolean)future1.isDone());
        ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
        ((ResourceManagerGateway)Mockito.verify((Object)this.resourceManagerGateway)).requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
        SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
        AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future1.isDone());
        slot1.releaseSlot();
        Future future2 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        SimpleSlot slot2 = (SimpleSlot)future2.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future2.isDone());
        Assert.assertNotEquals((Object)slot1, (Object)slot2);
        Assert.assertTrue((boolean)slot1.isReleased());
        Assert.assertTrue((boolean)slot2.isAlive());
        Assert.assertEquals((Object)slot1.getTaskManagerID(), (Object)slot2.getTaskManagerID());
        Assert.assertEquals((long)slot1.getSlotNumber(), (long)slot2.getSlotNumber());
    }

    @Test
    public void testOfferSlot() throws Exception {
        ResourceID resourceID = new ResourceID("resource");
        this.slotPool.registerTaskManager(resourceID);
        Future future = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        Assert.assertFalse((boolean)future.isDone());
        ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
        ((ResourceManagerGateway)Mockito.verify((Object)this.resourceManagerGateway)).requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
        SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
        AllocatedSlot invalid = SlotPoolTest.createAllocatedSlot(new ResourceID("unregistered"), slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertFalse((boolean)this.slotPool.offerSlot(invalid));
        AllocatedSlot notRequested = SlotPoolTest.createAllocatedSlot(resourceID, new AllocationID(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(notRequested));
        AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        SimpleSlot slot = (SimpleSlot)future.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)slot.isAlive());
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)slot.isAlive());
        slot.releaseSlot();
        Assert.assertTrue((boolean)slot.isReleased());
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
    }

    @Test
    public void testReleaseResource() throws Exception {
        ResourceID resourceID = new ResourceID("resource");
        this.slotPool.registerTaskManager(resourceID);
        Future future1 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        ArgumentCaptor slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
        ((ResourceManagerGateway)Mockito.verify((Object)this.resourceManagerGateway)).requestSlot((UUID)Matchers.any(UUID.class), (UUID)Matchers.any(UUID.class), (SlotRequest)slotRequestArgumentCaptor.capture(), (Time)Matchers.any(Time.class));
        SlotRequest slotRequest = (SlotRequest)slotRequestArgumentCaptor.getValue();
        Future future2 = this.slotPool.allocateSlot((ScheduledUnit)Mockito.mock(ScheduledUnit.class), AvailableSlotsTest.DEFAULT_TESTING_PROFILE, null);
        AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, slotRequest.getAllocationId(), this.jobId, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
        Assert.assertTrue((boolean)this.slotPool.offerSlot(allocatedSlot));
        SimpleSlot slot1 = (SimpleSlot)future1.get(1L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)future1.isDone());
        Assert.assertFalse((boolean)future2.isDone());
        this.slotPool.releaseTaskManager(resourceID);
        Assert.assertTrue((boolean)slot1.isReleased());
        Thread.sleep(10L);
        Assert.assertFalse((boolean)future2.isDone());
    }

    static AllocatedSlot createAllocatedSlot(ResourceID resourceId, AllocationID allocationId, JobID jobId, ResourceProfile resourceProfile) {
        TaskManagerLocation mockTaskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)mockTaskManagerLocation.getResourceID()).thenReturn((Object)resourceId);
        TaskManagerGateway mockTaskManagerGateway = (TaskManagerGateway)Mockito.mock(TaskManagerGateway.class);
        return new AllocatedSlot(allocationId, jobId, mockTaskManagerLocation, 0, resourceProfile, mockTaskManagerGateway);
    }
}

