package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.class */
public class SlotSharingSlotAllocatorTest extends TestLogger {
    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (allocationID, th, j) -> {
    };
    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION = (allocationID, resourceProfile) -> {
        return TestingPhysicalSlot.builder().withAllocationID(allocationID).withResourceProfile(resourceProfile).build();
    };
    private static final IsSlotAvailableAndFreeFunction TEST_IS_SLOT_FREE_FUNCTION = allocationID -> {
        return true;
    };
    private static final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
    private static final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
    private static final JobInformation.VertexInformation vertex1 = new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex2 = new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
    private static final JobInformation.VertexInformation vertex3 = new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest$TestJobInformation.class */
    private static class TestJobInformation implements JobInformation {
        private final Map<JobVertexID, JobInformation.VertexInformation> vertexIdToInformation;
        private final Collection<SlotSharingGroup> slotSharingGroups;

        private TestJobInformation(Collection<JobInformation.VertexInformation> collection) {
            this.vertexIdToInformation = (Map) collection.stream().collect(Collectors.toMap((v0) -> {
                return v0.getJobVertexID();
            }, Function.identity()));
            this.slotSharingGroups = (Collection) collection.stream().map((v0) -> {
                return v0.getSlotSharingGroup();
            }).collect(Collectors.toSet());
        }

        public Collection<SlotSharingGroup> getSlotSharingGroups() {
            return this.slotSharingGroups;
        }

        public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexID) {
            return this.vertexIdToInformation.get(jobVertexID);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest$TestVertexInformation.class */
    private static class TestVertexInformation implements JobInformation.VertexInformation {
        private final JobVertexID jobVertexId;
        private final int parallelism;
        private final SlotSharingGroup slotSharingGroup;

        private TestVertexInformation(JobVertexID jobVertexID, int i, SlotSharingGroup slotSharingGroup) {
            this.jobVertexId = jobVertexID;
            this.parallelism = i;
            this.slotSharingGroup = slotSharingGroup;
            slotSharingGroup.addVertexToGroup(jobVertexID);
        }

        public JobVertexID getJobVertexID() {
            return this.jobVertexId;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public SlotSharingGroup getSlotSharingGroup() {
            return this.slotSharingGroup;
        }
    }

    @Test
    public void testCalculateRequiredSlots() {
        ResourceCounter calculateRequiredSlots = SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION).calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3));
        Assert.assertThat(calculateRequiredSlots.getResources(), Matchers.contains(new ResourceProfile[]{ResourceProfile.UNKNOWN}));
        Assert.assertThat(Integer.valueOf(calculateRequiredSlots.getResourceCount(ResourceProfile.UNKNOWN)), CoreMatchers.is(Integer.valueOf(Math.max(vertex1.getParallelism(), vertex2.getParallelism()) + vertex3.getParallelism())));
    }

    @Test
    public void testDetermineParallelismWithMinimumSlots() {
        Map maxParallelismForVertices = ((VertexParallelism) SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION).determineParallelism(new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), getSlots(2)).get()).getMaxParallelismForVertices();
        Assert.assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), CoreMatchers.is(1));
        Assert.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), CoreMatchers.is(1));
        Assert.assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), CoreMatchers.is(1));
    }

    @Test
    public void testDetermineParallelismWithManySlots() {
        Map maxParallelismForVertices = ((VertexParallelism) SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION).determineParallelism(new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), getSlots(50)).get()).getMaxParallelismForVertices();
        Assert.assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), CoreMatchers.is(Integer.valueOf(vertex1.getParallelism())));
        Assert.assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), CoreMatchers.is(Integer.valueOf(vertex2.getParallelism())));
        Assert.assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), CoreMatchers.is(Integer.valueOf(vertex3.getParallelism())));
    }

    @Test
    public void testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
        Assert.assertThat(Boolean.valueOf(SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION).determineParallelism(new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), getSlots(1)).isPresent()), CoreMatchers.is(false));
    }

    @Test
    public void testReserveAvailableResources() {
        SlotSharingSlotAllocator createSlotSharingSlotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, TEST_IS_SLOT_FREE_FUNCTION);
        VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing = (VertexParallelismWithSlotSharing) createSlotSharingSlotAllocator.determineParallelism(new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), getSlots(50)).get();
        ReservedSlots reservedSlots = (ReservedSlots) createSlotSharingSlotAllocator.tryReserveResources(vertexParallelismWithSlotSharing).orElseThrow(() -> {
            return new RuntimeException("Expected that reservation succeeds.");
        });
        HashMap hashMap = new HashMap();
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot executionSlotSharingGroupAndSlot : vertexParallelismWithSlotSharing.getAssignments()) {
            Iterator it = executionSlotSharingGroupAndSlot.getExecutionSlotSharingGroup().getContainedExecutionVertices().iterator();
            while (it.hasNext()) {
                hashMap.put((ExecutionVertexID) it.next(), executionSlotSharingGroupAndSlot.getSlotInfo());
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            Assert.assertThat(reservedSlots.getSlotFor((ExecutionVertexID) entry.getKey()).getAllocationId(), CoreMatchers.is(((SlotInfo) entry.getValue()).getAllocationId()));
        }
    }

    @Test
    public void testReserveUnavailableResources() {
        SlotSharingSlotAllocator createSlotSharingSlotAllocator = SlotSharingSlotAllocator.createSlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, TEST_FREE_SLOT_FUNCTION, allocationID -> {
            return false;
        });
        Assert.assertFalse(createSlotSharingSlotAllocator.tryReserveResources((VertexParallelismWithSlotSharing) createSlotSharingSlotAllocator.determineParallelism(new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), getSlots(50)).get()).isPresent());
    }

    private static Collection<SlotInfo> getSlots(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new TestSlotInfo());
        }
        return arrayList;
    }
}
