/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingPayload;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SimpleExecutionSlotAllocatorTest {
    private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources((double)3.0, (int)5);
    private static final ExecutionAttemptID EXECUTION_ATTEMPT_ID = ExecutionGraphTestUtils.createExecutionAttemptId();

    SimpleExecutionSlotAllocatorTest() {
    }

    @Test
    void testSlotAllocation() {
        AllocationContext context = new AllocationContext();
        CompletableFuture slotFuture = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        Assertions.assertThat((CompletableFuture)slotFuture).isCompleted();
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(1);
        PhysicalSlotRequest slotRequest = context.getSlotProvider().getRequests().values().iterator().next();
        Assertions.assertThat((Object)slotRequest.getSlotProfile().getPhysicalSlotResourceProfile()).isEqualTo((Object)RESOURCE_PROFILE);
    }

    @Test
    void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws Exception {
        AllocationContext context = new AllocationContext();
        CompletableFuture slotFuture1 = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        CompletableFuture slotFuture2 = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(slotFuture1.get()).isSameAs(slotFuture2.get());
    }

    @Test
    void testFailedPhysicalSlotRequestFailsLogicalSlotFuture() {
        AllocationContext context = new AllocationContext(TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(), false);
        CompletableFuture slotFuture = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assertions.assertThat((CompletableFuture)slotFuture).isNotDone();
        context.getSlotProvider().getResponses().get(slotRequestId).completeExceptionally(new Throwable());
        Assertions.assertThat((CompletableFuture)slotFuture).isCompletedExceptionally();
        context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyFalse() throws Exception {
        SimpleExecutionSlotAllocatorTest.testSlotWillBeOccupiedIndefinitely(false);
    }

    @Test
    void testSlotWillBeOccupiedIndefinitelyTrue() throws Exception {
        SimpleExecutionSlotAllocatorTest.testSlotWillBeOccupiedIndefinitely(true);
    }

    private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) throws Exception {
        AllocationContext context = new AllocationContext(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), slotWillBeOccupiedIndefinitely);
        context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail();
        Assertions.assertThat((boolean)slotRequest.willSlotBeOccupiedIndefinitely()).isEqualTo(slotWillBeOccupiedIndefinitely);
        TestingPhysicalSlot physicalSlot = context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get();
        Assertions.assertThat((Object)physicalSlot.getPayload()).isNotNull();
        Assertions.assertThat((boolean)physicalSlot.getPayload().willOccupySlotIndefinitely()).isEqualTo(slotWillBeOccupiedIndefinitely);
    }

    @Test
    void testLogicalSlotReleasingCancelsPhysicalSlotRequest() throws Exception {
        SimpleExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(true, true, (BiConsumerWithException<AllocationContext, CompletableFuture<LogicalSlot>, Exception>)((BiConsumerWithException)(context, slotFuture) -> ((LogicalSlot)slotFuture.get()).releaseSlot(null)));
    }

    @Test
    void testLogicalSlotCancellationCancelsPhysicalSlotRequest() throws Exception {
        SimpleExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(false, true, (BiConsumerWithException<AllocationContext, CompletableFuture<LogicalSlot>, Exception>)((BiConsumerWithException)(context, slotFuture) -> ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
            slotFuture.get();
        }).as("The logical future must finish with a cancellation exception.", new Object[0])).isInstanceOf(CancellationException.class)));
    }

    @Test
    void testCompletedLogicalSlotCancellationDoesNotCancelPhysicalSlotRequest() throws Exception {
        SimpleExecutionSlotAllocatorTest.testLogicalSlotRequestCancellationOrRelease(true, false, (BiConsumerWithException<AllocationContext, CompletableFuture<LogicalSlot>, Exception>)((BiConsumerWithException)(context, slotFuture) -> {
            context.getAllocator().cancel(EXECUTION_ATTEMPT_ID);
            slotFuture.get();
        }));
    }

    private static void testLogicalSlotRequestCancellationOrRelease(boolean autoCompletePhysicalSlotFuture, boolean expectPhysicalSlotRequestCanceled, BiConsumerWithException<AllocationContext, CompletableFuture<LogicalSlot>, Exception> cancelOrReleaseAction) throws Exception {
        TestingPhysicalSlotProvider physicalSlotProvider = !autoCompletePhysicalSlotFuture ? TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation() : TestingPhysicalSlotProvider.createWithInfiniteSlotCreation();
        AllocationContext context = new AllocationContext(physicalSlotProvider, false);
        CompletableFuture slotFuture1 = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        cancelOrReleaseAction.accept((Object)context, (Object)slotFuture1);
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        Assertions.assertThat((boolean)context.getSlotProvider().getCancellations().containsKey(slotRequestId)).isEqualTo(expectPhysicalSlotRequestCanceled);
        context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        int expectedNumberOfRequests = expectPhysicalSlotRequestCanceled ? 2 : 1;
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests);
    }

    @Test
    void testPhysicalSlotReleasesLogicalSlots() throws Exception {
        AllocationContext context = new AllocationContext();
        CompletableFuture slotFuture = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        TestingPayload payload = new TestingPayload();
        slotFuture.thenAccept(logicalSlot -> logicalSlot.tryAssignPayload((LogicalSlot.Payload)payload));
        SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
        TestingPhysicalSlot physicalSlot = context.getSlotProvider().getFirstResponseOrFail().get();
        Assertions.assertThat(payload.getTerminalStateFuture()).isNotDone();
        Assertions.assertThat((Object)physicalSlot.getPayload()).isNotNull();
        physicalSlot.getPayload().release(new Throwable());
        Assertions.assertThat(payload.getTerminalStateFuture()).isDone();
        Assertions.assertThat(context.getSlotProvider().getCancellations()).containsKey((Object)slotRequestId);
        context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        Assertions.assertThat(context.getSlotProvider().getRequests()).hasSize(2);
    }

    @Test
    void testFailLogicalSlotIfPhysicalSlotIsFails() {
        AllocationContext context = new AllocationContext(TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation((Throwable)new FlinkException("test failure")), false);
        CompletableFuture slotFuture = context.allocateSlotsFor(SimpleExecutionSlotAllocatorTest.EXECUTION_ATTEMPT_ID);
        Assertions.assertThat((CompletableFuture)slotFuture).isCompletedExceptionally();
        Assertions.assertThat(context.getSlotProvider().getCancellations().keySet()).isEqualTo(context.getSlotProvider().getRequests().keySet());
    }

    @Test
    void testSlotProviderBatchSlotRequestTimeoutCheckIsEnabled() {
        AllocationContext context = new AllocationContext();
        Assertions.assertThat((boolean)context.getSlotProvider().isBatchSlotRequestTimeoutCheckEnabled()).isTrue();
    }

    private static class AllocationContext {
        private final TestingPhysicalSlotProvider slotProvider;
        private final boolean slotWillBeOccupiedIndefinitely;
        private final SimpleExecutionSlotAllocator allocator;

        public AllocationContext() {
            this(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), false);
        }

        public AllocationContext(TestingPhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) {
            this.slotProvider = slotProvider;
            this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
            this.allocator = new SimpleExecutionSlotAllocator((PhysicalSlotProvider)slotProvider, executionAttemptId -> RESOURCE_PROFILE, slotWillBeOccupiedIndefinitely);
        }

        private CompletableFuture<LogicalSlot> allocateSlotsFor(ExecutionAttemptID executionAttemptId) {
            return ((ExecutionSlotAssignment)this.allocator.allocateSlotsFor(Collections.singletonList(executionAttemptId)).get(0)).getLogicalSlotFuture();
        }

        public TestingPhysicalSlotProvider getSlotProvider() {
            return this.slotProvider;
        }

        public boolean isSlotWillBeOccupiedIndefinitely() {
            return this.slotWillBeOccupiedIndefinitely;
        }

        public SimpleExecutionSlotAllocator getAllocator() {
            return this.allocator;
        }
    }
}

