/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TestCapacitySchedulerPreemption {
    private static final Log LOG = LogFactory.getLog(TestCapacitySchedulerPreemption.class);
    private final int GB = 1024;
    private Configuration conf;
    RMNodeLabelsManager mgr;
    Clock clock;

    @Before
    public void setUp() throws Exception {
        this.conf = new YarnConfiguration();
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.conf.setBoolean("yarn.resourcemanager.scheduler.monitor.enable", true);
        this.conf.setClass("yarn.resourcemanager.scheduler.monitor.policies", ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
        this.conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
        this.conf.setInt("yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill", 0);
        this.conf.setBoolean("yarn.scheduler.capacity.lazy-preemption-enabled", true);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round", 1.0f);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor", 1.0f);
        this.mgr = new NullRMNodeLabelsManager();
        this.mgr.init(this.conf);
        this.clock = (Clock)Mockito.mock(Clock.class);
        Mockito.when((Object)this.clock.getTime()).thenReturn((Object)0L);
    }

    private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
        ResourceManager.RMActiveServices activeServices = rm.getRMActiveService();
        SchedulingMonitor mon = null;
        for (Service service : activeServices.getServices()) {
            if (!(service instanceof SchedulingMonitor)) continue;
            mon = (SchedulingMonitor)service;
            break;
        }
        if (mon != null) {
            return mon.getSchedulingEditPolicy();
        }
        return null;
    }

    @Test(timeout=60000L)
    public void testSimplePreemption() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 4096);
        MockNM nm2 = rm1.registerNode("h2:1234", 4096);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 7, new ArrayList<ContainerId>());
        for (int i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        Assert.assertEquals((long)0L, (long)cs.getNode(nm2.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)1)), null);
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        Map<ContainerId, RMContainer> killableContainers = this.waitKillableContainersSize(pm, "a", "", 1);
        Assert.assertEquals((long)1L, (long)killableContainers.size());
        Assert.assertEquals((Object)killableContainers.entrySet().iterator().next().getKey().getApplicationAttemptId(), (Object)am1.getApplicationAttemptId());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
        Assert.assertEquals((long)6L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)2L, (long)schedulerApp2.getLiveContainers().size());
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPreemptionConsidersNodeLocalityDelay() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 4096);
        MockNM nm2 = rm1.registerNode("h2:1234", 4096);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (int i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        Assert.assertEquals((long)0L, (long)cs.getNode(nm2.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)1), ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"unknownhost", (Resource)Resources.createResource((int)1024), (int)1), ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"/default-rack", (Resource)Resources.createResource((int)1024), (int)1)), null);
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        Map<ContainerId, RMContainer> killableContainers = this.waitKillableContainersSize(pm, "a", "", 1);
        Assert.assertEquals((Object)killableContainers.entrySet().iterator().next().getKey().getApplicationAttemptId(), (Object)am1.getApplicationAttemptId());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)1L, (long)schedulerApp2.getLiveContainers().size());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        Assert.assertEquals((long)6L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)2L, (long)schedulerApp2.getLiveContainers().size());
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPreemptionConsidersHardNodeLocality() throws Exception {
        int i;
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 4096);
        MockNM nm2 = rm1.registerNode("h2:1234", 4096);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        }
        for (i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        Assert.assertEquals((long)0L, (long)cs.getNode(nm2.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)1, (boolean)true), ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"h3", (Resource)Resources.createResource((int)1024), (int)1, (boolean)false), ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"/default-rack", (Resource)Resources.createResource((int)1024), (int)1, (boolean)false)), null);
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        Map<ContainerId, RMContainer> killableContainers = this.waitKillableContainersSize(pm, "a", "", 1);
        Assert.assertEquals((Object)killableContainers.entrySet().iterator().next().getKey().getApplicationAttemptId(), (Object)am1.getApplicationAttemptId());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)1L, (long)schedulerApp2.getLiveContainers().size());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)1L, (long)schedulerApp2.getLiveContainers().size());
        rm1.close();
    }

    @Test(timeout=60000L)
    public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 8192);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (int i = 0; i < 6; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        this.waitKillableContainersSize(pm, "a", "", 1);
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        HashSet previousKillableContainers = new HashSet(pm.getKillableContainersMap("a", "").keySet());
        am2.allocate("*", 1024, 2, new ArrayList<ContainerId>());
        editPolicy.editSchedule();
        Assert.assertEquals((long)1L, (long)editPolicy.getToPreemptContainers().size());
        editPolicy.editSchedule();
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        Map<ContainerId, RMContainer> killableContainers = this.waitKillableContainersSize(pm, "a", "", 2);
        Assert.assertTrue((boolean)Sets.difference(previousKillableContainers, killableContainers.keySet()).isEmpty());
    }

    @Test(timeout=60000L)
    public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() throws Exception {
        MockRM rm1 = new MockRM(this.conf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 8192);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (int i = 0; i < 6; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate("*", 3072, 1, new ArrayList<ContainerId>());
        ProportionalCapacityPreemptionPolicy editPolicy = (ProportionalCapacityPreemptionPolicy)this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        this.waitKillableContainersSize(pm, "a", "", 3);
        am2.allocate("*", 2048, 1, new ArrayList<ContainerId>());
        editPolicy.editSchedule();
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        this.waitKillableContainersSize(pm, "a", "", 2);
        editPolicy.editSchedule();
        Assert.assertEquals((long)0L, (long)editPolicy.getToPreemptContainers().size());
        this.waitKillableContainersSize(pm, "a", "", 2);
    }

    @Test(timeout=60000L)
    public void testPreemptionConsidersUserLimit() throws Exception {
        CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(this.conf);
        csConf.setUserLimitFactor("root.c", 0.1f);
        MockRM rm1 = new MockRM((Configuration)csConf);
        rm1.getRMContext().setNodeLabelManager(this.mgr);
        rm1.start();
        MockNM nm1 = rm1.registerNode("h1:1234", 4096);
        MockNM nm2 = rm1.registerNode("h2:1234", 4096);
        CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
        RMNode rmNode1 = (RMNode)rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
        RMNode rmNode2 = (RMNode)rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
        RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a");
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        am1.allocate("*", 1024, 6, new ArrayList<ContainerId>());
        for (int i = 0; i < 3; ++i) {
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode1));
            cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        }
        FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        RMApp app2 = rm1.submitApp(1024, "app", "user", null, "c");
        MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
        Assert.assertEquals((long)0L, (long)cs.getNode(nm1.getNodeId()).getUnallocatedResource().getMemory());
        Assert.assertEquals((long)0L, (long)cs.getNode(nm2.getNodeId()).getUnallocatedResource().getMemory());
        am2.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resources.createResource((int)1024), (int)1)), null);
        SchedulingEditPolicy editPolicy = this.getSchedulingEditPolicy(rm1);
        editPolicy.editSchedule();
        editPolicy.editSchedule();
        PreemptionManager pm = cs.getPreemptionManager();
        Map<ContainerId, RMContainer> killableContainers = this.waitKillableContainersSize(pm, "a", "", 0);
        Assert.assertEquals((long)0L, (long)killableContainers.size());
        cs.handle((SchedulerEvent)new NodeUpdateSchedulerEvent(rmNode2));
        FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
        Assert.assertEquals((long)7L, (long)schedulerApp1.getLiveContainers().size());
        Assert.assertEquals((long)1L, (long)schedulerApp2.getLiveContainers().size());
        rm1.close();
    }

    private Map<ContainerId, RMContainer> waitKillableContainersSize(PreemptionManager pm, String queueName, String partition, int expectedSize) throws InterruptedException {
        Map killableContainers = pm.getKillableContainersMap(queueName, partition);
        for (int wait = 0; expectedSize != killableContainers.size() && wait < 500; ++wait) {
            killableContainers = pm.getKillableContainersMap(queueName, partition);
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)expectedSize, (long)killableContainers.size());
        return killableContainers;
    }
}

