package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.placement.TestPlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodesWithPreemption.class */
public class TestCapacitySchedulerMultiNodesWithPreemption {
    private static final Log LOG = LogFactory.getLog(TestCapacitySchedulerMultiNodesWithPreemption.class);
    private CapacitySchedulerConfiguration conf;
    private static final String POLICY_CLASS_NAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";

    @Before
    public void setUp() {
        CapacitySchedulerConfiguration capacitySchedulerConfiguration = new CapacitySchedulerConfiguration();
        capacitySchedulerConfiguration.set("yarn.scheduler.capacity.resource-calculator", DominantResourceCalculator.class.getName());
        this.conf = new CapacitySchedulerConfiguration(capacitySchedulerConfiguration);
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.conf.set("yarn.scheduler.capacity.multi-node-sorting.policy.names", "resource-based");
        this.conf.set("yarn.scheduler.capacity.multi-node-sorting.policy", "resource-based");
        this.conf.set("yarn.scheduler.capacity.multi-node-sorting.policy.resource-based.class", POLICY_CLASS_NAME);
        this.conf.setBoolean("yarn.scheduler.capacity.multi-node-placement-enabled", true);
        this.conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", TestPlacementManager.APP_ID1);
        this.conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
        this.conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
        this.conf.setInt("yarn.scheduler.maximum-allocation-mb", 102400);
        this.conf.set("yarn.scheduler.capacity.root.queues", "A, default");
        this.conf.set("yarn.scheduler.capacity.root.A.capacity", "50");
        this.conf.set("yarn.scheduler.capacity.root.default.capacity", "50");
        this.conf.set("yarn.scheduler.capacity.root.A.maximum-capacity", "100");
        this.conf.set("yarn.scheduler.capacity.root.default.maximum-capacity", "100");
        this.conf.set("yarn.scheduler.capacity.root.A.user-limit-factor", "10");
        this.conf.set("yarn.scheduler.capacity.root.default.user-limit-factor", "10");
        this.conf.setLong("yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill", 10000L);
        this.conf.setLong("yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval", 1500L);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round", 1.0f);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor", 1.0f);
        this.conf.set("yarn.resourcemanager.scheduler.monitor.policies", ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
        this.conf.setBoolean("yarn.resourcemanager.scheduler.monitor.enable", true);
        this.conf.setLong("yarn.resourcemanager.nodemanagers.heartbeat-interval-ms", 60000L);
    }

    @Test(timeout = 60000)
    public void testAllocateReservationFromOtherNode() throws Exception {
        final MockRM mockRM = new MockRM(this.conf);
        mockRM.start();
        final MockNM registerNode = mockRM.registerNode("127.0.0.1:1234", 1024, 2);
        MockNM registerNode2 = mockRM.registerNode("127.0.0.2:1234", 2048, 2);
        MockNM registerNode3 = mockRM.registerNode("127.0.0.3:1234", 3072, 2);
        MockNM[] mockNMArr = {registerNode, registerNode2, registerNode3};
        mockRM.getRMContext().getMultiNodeSortingManager().getMultiNodePolicy(POLICY_CLASS_NAME).reSortClusterNodes();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(3072L, mockRM).withAppName("app-1").withUser("user1").withAcls(null).withQueue("default").build()), mockRM, registerNode);
        launchAndRegisterAM.allocateAndWaitForContainers("*", 1, 2048, registerNode2);
        launchAndRegisterAM.allocateAndWaitForContainers("*", 1, 1024, registerNode3);
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(mockRM.getResourceScheduler().getNodeReport(mockNMArr[0].getNodeId()).getAvailableResource().getMemorySize() == 0 && mockRM.getResourceScheduler().getNodeReport(mockNMArr[1].getNodeId()).getAvailableResource().getMemorySize() == 0);
        }, 10L, 10000L);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final RMApp submit = MockRMAppSubmitter.submit(mockRM, MockRMAppSubmissionData.Builder.createWithMemory(TestQueueMetricsForCustomResources.GB, mockRM).withAppName("app-2").withUser("user2").withAcls(null).withQueue("A").build());
        new Thread() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerMultiNodesWithPreemption.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MockRM.launchAM(submit, mockRM, registerNode);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    Assert.fail("Failed to launch app-2");
                }
            }
        }.start();
        AtomicReference atomicReference = new AtomicReference();
        GenericTestUtils.waitFor(() -> {
            for (int i = 0; i < mockNMArr.length; i++) {
                if (mockRM.getResourceScheduler().getNodeReport(mockNMArr[i].getNodeId()).getAvailableResource().getMemorySize() == TestQueueMetricsForCustomResources.GB) {
                    atomicReference.set(mockNMArr[i]);
                    return true;
                }
            }
            return false;
        }, 10L, 30000L);
        LOG.info("Preempted node is: " + ((MockNM) atomicReference.get()).getNodeId());
        FiCaSchedulerNode node = mockRM.getResourceScheduler().getNodeTracker().getNode(((MockNM) atomicReference.get()).getNodeId());
        node.deductUnallocatedResource(Resource.newInstance(node.getUnallocatedResource()));
        mockRM.getResourceScheduler().getNodeTracker().removeNode(((MockNM) atomicReference.get()).getNodeId());
        mockRM.getResourceScheduler().getNodeTracker().addNode(node);
        mockRM.getResourceScheduler().handle(new NodeUpdateSchedulerEvent((RMNode) mockRM.getRMContext().getRMNodes().get(((MockNM) atomicReference.get()).getNodeId())));
        CapacityScheduler resourceScheduler = mockRM.getResourceScheduler();
        ApplicationAttemptId appAttemptId = submit.getCurrentAppAttempt().getAppAttemptId();
        FiCaSchedulerApp applicationAttempt = resourceScheduler.getApplicationAttempt(appAttemptId);
        Assert.assertEquals("App2 failed to get reserved container", 1L, applicationAttempt.getReservedContainers().size());
        LOG.info("Reserved node is: " + ((RMContainer) applicationAttempt.getReservedContainers().get(0)).getReservedNode());
        Assert.assertNotEquals("Failed to reserve as per the Multi Node Itearor", ((RMContainer) applicationAttempt.getReservedContainers().get(0)).getReservedNode(), ((MockNM) atomicReference.get()).getNodeId());
        FiCaSchedulerNode node2 = mockRM.getResourceScheduler().getNodeTracker().getNode(((MockNM) atomicReference.get()).getNodeId());
        node2.updateTotalResource(Resources.add(node2.getTotalResource(), node2.getAllocatedResource()));
        mockRM.getResourceScheduler().getNodeTracker().removeNode(((MockNM) atomicReference.get()).getNodeId());
        mockRM.getResourceScheduler().getNodeTracker().addNode(node2);
        mockRM.getResourceScheduler().handle(new NodeUpdateSchedulerEvent((RMNode) mockRM.getRMContext().getRMNodes().get(((MockNM) atomicReference.get()).getNodeId())));
        GenericTestUtils.waitFor(() -> {
            return Boolean.valueOf(atomicBoolean.get());
        }, 10L, 20000L);
        FiCaSchedulerApp applicationAttempt2 = resourceScheduler.getApplicationAttempt(appAttemptId);
        Assert.assertEquals("App2 failed to get Allocated", 1L, applicationAttempt2.getLiveContainers().size());
        Assert.assertEquals("App2 failed to Unreserve", 0L, applicationAttempt2.getReservedContainers().size());
        mockRM.stop();
    }
}
