package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncerTest.class */
public class DefaultSlotStatusSyncerTest extends TestLogger {
    private static final Time TASK_MANAGER_REQUEST_TIMEOUT = Time.seconds(10);
    private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testAllocateSlot() throws Exception {
        FineGrainedTaskManagerTracker fineGrainedTaskManagerTracker = new FineGrainedTaskManagerTracker();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            completableFuture.complete(tuple6);
            return completableFuture2;
        }).createTestingTaskExecutorGateway());
        fineGrainedTaskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker defaultResourceTracker = new DefaultResourceTracker();
        JobID jobID = new JobID();
        DefaultSlotStatusSyncer defaultSlotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        defaultSlotStatusSyncer.initialize(fineGrainedTaskManagerTracker, defaultResourceTracker, ResourceManagerId.generate(), EXECUTOR_RESOURCE.getExecutor());
        CompletableFuture allocateSlot = defaultSlotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobID, "address", ResourceProfile.ANY);
        AllocationID allocationID = (AllocationID) ((Tuple6) completableFuture.get()).f2;
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Matchers.contains(new ResourceRequirement[]{ResourceRequirement.create(ResourceProfile.ANY, 1)}));
        Assert.assertTrue(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).isPresent());
        Assert.assertThat(((TaskManagerSlotInformation) fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).get()).getJobId(), Is.is(jobID));
        Assert.assertThat(((TaskManagerSlotInformation) fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).get()).getState(), Is.is(SlotState.PENDING));
        completableFuture2.complete(Acknowledge.get());
        Assert.assertFalse(allocateSlot.isCompletedExceptionally());
    }

    @Test
    public void testAllocateSlotFailsWithException() {
        FineGrainedTaskManagerTracker fineGrainedTaskManagerTracker = new FineGrainedTaskManagerTracker();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            return FutureUtils.completedExceptionally(new TimeoutException("timeout"));
        }).createTestingTaskExecutorGateway());
        fineGrainedTaskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker defaultResourceTracker = new DefaultResourceTracker();
        JobID jobID = new JobID();
        DefaultSlotStatusSyncer defaultSlotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        defaultSlotStatusSyncer.initialize(fineGrainedTaskManagerTracker, defaultResourceTracker, ResourceManagerId.generate(), EXECUTOR_RESOURCE.getExecutor());
        try {
            defaultSlotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobID, "address", ResourceProfile.ANY).get();
        } catch (Exception e) {
            Assert.assertThat(e.getCause(), Matchers.instanceOf(TimeoutException.class));
        }
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Is.is(Matchers.empty()));
        Assert.assertThat(((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAllocatedSlots().keySet(), Is.is(Matchers.empty()));
    }

    @Test
    public void testFreeSlot() {
        FineGrainedTaskManagerTracker fineGrainedTaskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker defaultResourceTracker = new DefaultResourceTracker();
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        DefaultSlotStatusSyncer defaultSlotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        defaultSlotStatusSyncer.initialize(fineGrainedTaskManagerTracker, defaultResourceTracker, ResourceManagerId.generate(), EXECUTOR_RESOURCE.getExecutor());
        fineGrainedTaskManagerTracker.addTaskManager(TASK_EXECUTOR_CONNECTION, ResourceProfile.ANY, ResourceProfile.ANY);
        fineGrainedTaskManagerTracker.notifySlotStatus(allocationID, jobID, TASK_EXECUTOR_CONNECTION.getInstanceID(), ResourceProfile.ANY, SlotState.ALLOCATED);
        defaultResourceTracker.notifyAcquiredResource(jobID, ResourceProfile.ANY);
        defaultSlotStatusSyncer.freeSlot(allocationID);
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Is.is(Matchers.empty()));
        Assert.assertThat(((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID()).get()).getAllocatedSlots().keySet(), Is.is(Matchers.empty()));
    }

    @Test
    public void testSlotStatusProcessing() {
        FineGrainedTaskManagerTracker fineGrainedTaskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker defaultResourceTracker = new DefaultResourceTracker();
        DefaultSlotStatusSyncer defaultSlotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        defaultSlotStatusSyncer.initialize(fineGrainedTaskManagerTracker, defaultResourceTracker, ResourceManagerId.generate(), EXECUTOR_RESOURCE.getExecutor());
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            return new CompletableFuture();
        }).createTestingTaskExecutorGateway());
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotID slotID = new SlotID(taskExecutorConnection.getResourceID(), 0);
        SlotID slotID2 = new SlotID(taskExecutorConnection.getResourceID(), 1);
        SlotID slotID3 = new SlotID(taskExecutorConnection.getResourceID(), 2);
        ResourceProfile fromResources = ResourceProfile.fromResources(5.0d, 20);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(1.0d, 4);
        SlotReport slotReport = new SlotReport(Arrays.asList(new SlotStatus(slotID, fromResources), new SlotStatus(slotID2, fromResources2, jobID, allocationID), new SlotStatus(slotID3, fromResources2, jobID, allocationID2)));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(new SlotStatus(slotID3, fromResources2), new SlotStatus(slotID2, fromResources2, jobID, allocationID)));
        fineGrainedTaskManagerTracker.addTaskManager(taskExecutorConnection, fromResources, fromResources);
        defaultSlotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Matchers.contains(new ResourceRequirement[]{ResourceRequirement.create(fromResources2, 2)}));
        Assert.assertThat(((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), IsEqual.equalTo(ResourceProfile.fromResources(3.0d, 12)));
        Assert.assertTrue(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).isPresent());
        Assert.assertTrue(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID2).isPresent());
        defaultSlotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobID, "address", fromResources2);
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Matchers.contains(new ResourceRequirement[]{ResourceRequirement.create(fromResources2, 3)}));
        Assert.assertThat(((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), IsEqual.equalTo(ResourceProfile.fromResources(2.0d, 8)));
        AllocationID allocationID3 = (AllocationID) ((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAllocatedSlots().keySet().stream().filter(allocationID4 -> {
            return (allocationID4.equals(allocationID) || allocationID4.equals(allocationID2)) ? false : true;
        }).findAny().get();
        defaultSlotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2);
        Assert.assertThat(defaultResourceTracker.getAcquiredResources(jobID), Matchers.contains(new ResourceRequirement[]{ResourceRequirement.create(fromResources2, 2)}));
        Assert.assertThat(((TaskManagerInfo) fineGrainedTaskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), IsEqual.equalTo(ResourceProfile.fromResources(3.0d, 12)));
        Assert.assertTrue(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).isPresent());
        Assert.assertFalse(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID2).isPresent());
        Assert.assertTrue(fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID3).isPresent());
        Assert.assertThat(((TaskManagerSlotInformation) fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID).get()).getState(), Is.is(SlotState.ALLOCATED));
        Assert.assertThat(((TaskManagerSlotInformation) fineGrainedTaskManagerTracker.getAllocatedOrPendingSlot(allocationID3).get()).getState(), Is.is(SlotState.PENDING));
    }
}
