package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.class */
public class TestCapacitySchedulerLazyPreemption extends CapacitySchedulerPreemptionTestBase {
    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerPreemptionTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.conf.setBoolean("yarn.scheduler.capacity.lazy-preemption-enabled", true);
    }

    @Test(timeout = 60000)
    public void testSimplePreemption() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 4096);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 4096);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 7, new ArrayList());
        for (int i = 0; i < 3; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        MockAM launchAndRegisterAM2 = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode2);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode2.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM2.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 1)), null);
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        Map<ContainerId, RMContainer> waitKillableContainersSize = waitKillableContainersSize(resourceScheduler.getPreemptionManager(), "root.a", "", 1);
        Assert.assertEquals(1L, waitKillableContainersSize.size());
        Assert.assertEquals(waitKillableContainersSize.entrySet().iterator().next().getKey().getApplicationAttemptId(), launchAndRegisterAM.getApplicationAttemptId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(launchAndRegisterAM2.getApplicationAttemptId());
        Assert.assertEquals(6L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(2L, applicationAttempt2.getLiveContainers().size());
        Assert.assertEquals("Number of preempted containers incorrectly recorded:", 1L, resourceScheduler.getQueue("a").getMetrics().getAggregatePreemptedContainers());
        Assert.assertEquals("Number of preempted containers incorrectly recorded:", 1L, resourceScheduler.getRootQueue().getMetrics().getAggregatePreemptedContainers());
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testPreemptionConsidersNodeLocalityDelay() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 4096);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 4096);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 6, new ArrayList());
        for (int i = 0; i < 3; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        MockAM launchAndRegisterAM2 = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode2);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode2.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM2.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 1), ResourceRequest.newInstance(Priority.newInstance(1), "unknownhost", Resources.createResource(1024), 1), ResourceRequest.newInstance(Priority.newInstance(1), "/default-rack", Resources.createResource(1024), 1)), null);
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(waitKillableContainersSize(resourceScheduler.getPreemptionManager(), "root.a", "", 1).entrySet().iterator().next().getKey().getApplicationAttemptId(), launchAndRegisterAM.getApplicationAttemptId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(launchAndRegisterAM2.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(1L, applicationAttempt2.getLiveContainers().size());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        Assert.assertEquals(6L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(2L, applicationAttempt2.getLiveContainers().size());
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testPreemptionConsidersHardNodeLocality() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 4096);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 4096);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 6, new ArrayList());
        for (int i = 0; i < 3; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        }
        for (int i2 = 0; i2 < 3; i2++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        MockAM launchAndRegisterAM2 = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode2);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode2.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM2.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 1, true), ResourceRequest.newInstance(Priority.newInstance(1), "h3", Resources.createResource(1024), 1, false), ResourceRequest.newInstance(Priority.newInstance(1), "/default-rack", Resources.createResource(1024), 1, false)), null);
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(waitKillableContainersSize(resourceScheduler.getPreemptionManager(), "root.a", "", 1).entrySet().iterator().next().getKey().getApplicationAttemptId(), launchAndRegisterAM.getApplicationAttemptId());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(launchAndRegisterAM2.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(1L, applicationAttempt2.getLiveContainers().size());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(1L, applicationAttempt2.getLiveContainers().size());
        mockRM.close();
    }

    @Test(timeout = 60000)
    public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode).allocate("*", 1024, 6, new ArrayList());
        for (int i = 0; i < 6; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        }
        Assert.assertEquals(7L, resourceScheduler.getApplicationAttempt(r0.getApplicationAttemptId()).getLiveContainers().size());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM.allocate("*", 1024, 1, new ArrayList());
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        PreemptionManager preemptionManager = resourceScheduler.getPreemptionManager();
        waitKillableContainersSize(preemptionManager, "root.a", "", 1);
        Assert.assertEquals(0L, schedulingEditPolicy.getToPreemptContainers().size());
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(0L, schedulingEditPolicy.getToPreemptContainers().size());
        HashSet hashSet = new HashSet(preemptionManager.getKillableContainersMap("a", "").keySet());
        launchAndRegisterAM.allocate("*", 1024, 2, new ArrayList());
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(1L, schedulingEditPolicy.getToPreemptContainers().size());
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(0L, schedulingEditPolicy.getToPreemptContainers().size());
        Assert.assertTrue(Sets.difference(hashSet, waitKillableContainersSize(preemptionManager, "root.a", "", 2).keySet()).isEmpty());
    }

    @Test(timeout = 60000)
    @Ignore
    public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() throws Exception {
        MockRM mockRM = new MockRM(this.conf);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 8192);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode).allocate("*", 1024, 6, new ArrayList());
        for (int i = 0; i < 6; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
        }
        Assert.assertEquals(7L, resourceScheduler.getApplicationAttempt(r0.getApplicationAttemptId()).getLiveContainers().size());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM.allocate("*", 3072, 1, new ArrayList());
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        PreemptionManager preemptionManager = resourceScheduler.getPreemptionManager();
        waitKillableContainersSize(preemptionManager, "a", "", 3);
        launchAndRegisterAM.allocate("*", 2048, 1, new ArrayList());
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(0L, schedulingEditPolicy.getToPreemptContainers().size());
        waitKillableContainersSize(preemptionManager, "a", "", 2);
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(0L, schedulingEditPolicy.getToPreemptContainers().size());
        waitKillableContainersSize(preemptionManager, "a", "", 2);
    }

    @Test(timeout = 60000)
    public void testPreemptionConsidersUserLimit() throws Exception {
        CapacitySchedulerConfiguration capacitySchedulerConfiguration = new CapacitySchedulerConfiguration(this.conf);
        capacitySchedulerConfiguration.setUserLimitFactor(TestCapacitySchedulerAutoCreatedQueueBase.C, 0.1f);
        MockRM mockRM = new MockRM(capacitySchedulerConfiguration);
        mockRM.getRMContext().setNodeLabelManager(this.mgr);
        mockRM.start();
        MockNM registerNode = mockRM.registerNode("h1:1234", 4096);
        MockNM registerNode2 = mockRM.registerNode("h2:1234", 4096);
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        RMNode rMNode = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode.getNodeId());
        RMNode rMNode2 = (RMNode) mockRM.getRMContext().getRMNodes().get(registerNode2.getNodeId());
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("a").withUnmanagedAM(false).build()), mockRM, registerNode);
        launchAndRegisterAM.allocate("*", 1024, 6, new ArrayList());
        for (int i = 0; i < 3; i++) {
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode));
            resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        }
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(launchAndRegisterAM.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        MockAM launchAndRegisterAM2 = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app").withUser("user").withAcls(null).withQueue("c").withUnmanagedAM(false).build()), mockRM, registerNode2);
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode.getNodeId()).getUnallocatedResource().getMemorySize());
        Assert.assertEquals(0L, resourceScheduler.getNode(registerNode2.getNodeId()).getUnallocatedResource().getMemorySize());
        launchAndRegisterAM2.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 1)), null);
        ProportionalCapacityPreemptionPolicy schedulingEditPolicy = mockRM.getResourceScheduler().getSchedulingMonitorManager().getAvailableSchedulingMonitor().getSchedulingEditPolicy();
        schedulingEditPolicy.editSchedule();
        schedulingEditPolicy.editSchedule();
        Assert.assertEquals(0L, waitKillableContainersSize(resourceScheduler.getPreemptionManager(), "a", "", 0).size());
        resourceScheduler.handle(new NodeUpdateSchedulerEvent(rMNode2));
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(launchAndRegisterAM2.getApplicationAttemptId());
        Assert.assertEquals(7L, applicationAttempt.getLiveContainers().size());
        Assert.assertEquals(1L, applicationAttempt2.getLiveContainers().size());
        mockRM.close();
    }

    private Map<ContainerId, RMContainer> waitKillableContainersSize(PreemptionManager preemptionManager, String str, String str2, int i) throws InterruptedException {
        Map<ContainerId, RMContainer> killableContainersMap = preemptionManager.getKillableContainersMap(str, str2);
        for (int i2 = 0; i != killableContainersMap.size() && i2 < 500; i2++) {
            killableContainersMap = preemptionManager.getKillableContainersMap(str, str2);
            Thread.sleep(10L);
        }
        Assert.assertEquals(i, killableContainersMap.size());
        return killableContainersMap;
    }
}
