/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMHATestBase;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert;
import org.junit.Test;

public class TestReservationSystemWithRMHA
extends RMHATestBase {
    @Override
    public void setup() throws Exception {
        CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
        ReservationSystemTestUtil.setupQueueConfiguration(conf);
        conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        conf.setBoolean("yarn.resourcemanager.reservation-system.enable", true);
        this.configuration = conf;
        super.setup();
    }

    @Test
    public void testSubmitReservationAndCheckAfterFailover() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        this.explicitFailover();
        rm2.registerNode("127.0.0.1:1", 102400, 100);
        RMStateStore.RMState state = rm2.getRMContext().getStateStore().loadState();
        Map reservationStateMap = (Map)state.getReservationState().get("dedicated");
        Assert.assertNotNull((Object)reservationStateMap);
        Assert.assertNotNull(reservationStateMap.get(reservationID));
    }

    @Test
    public void testUpdateReservationAndCheckAfterFailover() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        long newDeadline = reservationDefinition.getDeadline() + 100L;
        reservationDefinition.setDeadline(newDeadline);
        ReservationUpdateRequest updateRequest = ReservationUpdateRequest.newInstance((ReservationDefinition)reservationDefinition, (ReservationId)reservationID);
        rm1.updateReservationState(updateRequest);
        this.explicitFailover();
        rm2.registerNode("127.0.0.1:1", 102400, 100);
        RMStateStore.RMState state = rm2.getRMContext().getStateStore().loadState();
        Map reservationStateMap = (Map)state.getReservationState().get("dedicated");
        Assert.assertNotNull((Object)reservationStateMap);
        YarnProtos.ReservationAllocationStateProto reservationState = (YarnProtos.ReservationAllocationStateProto)reservationStateMap.get(reservationID);
        Assert.assertEquals((long)newDeadline, (long)reservationState.getReservationDefinition().getDeadline());
    }

    @Test
    public void testDeleteReservationAndCheckAfterFailover() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance((ReservationId)reservationID);
        clientService.deleteReservation(deleteRequest);
        this.explicitFailover();
        rm2.registerNode("127.0.0.1:1", 102400, 100);
        RMStateStore.RMState state = rm2.getRMContext().getStateStore().loadState();
        Assert.assertNull(state.getReservationState().get("dedicated"));
    }

    private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
        try {
            rm.registerNode("127.0.0.1:1", memory, vCores);
            int attempts = 10;
            do {
                DrainDispatcher dispatcher = (DrainDispatcher)rm1.getRMContext().getDispatcher();
                dispatcher.await();
                rm.getRMContext().getReservationSystem().synchronizePlan("dedicated", false);
                if (rm.getRMContext().getReservationSystem().getPlan("dedicated").getTotalCapacity().getMemory() > 0) break;
                LOG.info("Waiting for node capacity to be added to plan");
                Thread.sleep(100L);
            } while (attempts-- > 0);
            if (attempts <= 0) {
                Assert.fail((String)"Exhausted attempts in checking if node capacity was added to the plan");
            }
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
    }

    private ReservationSubmissionRequest createReservationSubmissionRequest() {
        UTCClock clock = new UTCClock();
        long arrival = clock.getTime();
        long duration = 60000L;
        long deadline = arrival + duration + 1500L;
        return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival, deadline, duration);
    }

    private void validateReservation(Plan plan, ReservationId resId, ReservationDefinition rDef) {
        ReservationAllocation reservation = plan.getReservationById(resId);
        Assert.assertNotNull((Object)reservation);
        Assert.assertEquals((long)rDef.getDeadline(), (long)reservation.getReservationDefinition().getDeadline());
    }

    @Test
    public void testSubmitReservationFailoverAndDelete() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        this.explicitFailover();
        this.addNodeCapacityToPlan(rm2, 102400, 100);
        Plan plan = rm2.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, reservationID, reservationDefinition);
        ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance((ReservationId)reservationID);
        ReservationDeleteResponse deleteResponse = null;
        clientService = rm2.getClientRMService();
        try {
            deleteResponse = clientService.deleteReservation(deleteRequest);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)deleteResponse);
        Assert.assertNull((Object)plan.getReservationById(reservationID));
    }

    @Test
    public void testFailoverAndSubmitReservation() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        this.explicitFailover();
        this.addNodeCapacityToPlan(rm2, 102400, 100);
        ClientRMService clientService = rm2.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        Plan plan = rm2.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, reservationID, reservationDefinition);
    }

    @Test
    public void testSubmitReservationFailoverAndUpdate() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        this.explicitFailover();
        this.addNodeCapacityToPlan(rm2, 102400, 100);
        Plan plan = rm2.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, reservationID, reservationDefinition);
        long newDeadline = reservationDefinition.getDeadline() + 100L;
        reservationDefinition.setDeadline(newDeadline);
        ReservationUpdateRequest updateRequest = ReservationUpdateRequest.newInstance((ReservationDefinition)reservationDefinition, (ReservationId)reservationID);
        ReservationUpdateResponse updateResponse = null;
        clientService = rm2.getClientRMService();
        try {
            updateResponse = clientService.updateReservation(updateRequest);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)updateResponse);
        this.validateReservation(plan, reservationID, reservationDefinition);
    }

    @Test
    public void testSubmitUpdateReservationFailoverAndDelete() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId reservationID = response.getReservationId();
        Assert.assertNotNull((Object)reservationID);
        LOG.info("Submit reservation response: " + reservationID);
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        Plan plan = rm1.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, reservationID, reservationDefinition);
        long newDeadline = reservationDefinition.getDeadline() + 100L;
        reservationDefinition.setDeadline(newDeadline);
        ReservationUpdateRequest updateRequest = ReservationUpdateRequest.newInstance((ReservationDefinition)reservationDefinition, (ReservationId)reservationID);
        ReservationUpdateResponse updateResponse = null;
        try {
            updateResponse = clientService.updateReservation(updateRequest);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)updateResponse);
        this.validateReservation(plan, reservationID, reservationDefinition);
        this.explicitFailover();
        this.addNodeCapacityToPlan(rm2, 102400, 100);
        plan = rm2.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, reservationID, reservationDefinition);
        ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance((ReservationId)reservationID);
        ReservationDeleteResponse deleteResponse = null;
        clientService = rm2.getClientRMService();
        try {
            deleteResponse = clientService.deleteReservation(deleteRequest);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)deleteResponse);
        Assert.assertNull((Object)plan.getReservationById(reservationID));
    }

    @Test
    public void testReservationResizeAfterFailover() throws Exception {
        this.startRMs();
        this.addNodeCapacityToPlan(rm1, 102400, 100);
        ClientRMService clientService = rm1.getClientRMService();
        ReservationSubmissionRequest request = this.createReservationSubmissionRequest();
        ReservationDefinition reservationDefinition = request.getReservationDefinition();
        ReservationSubmissionResponse response = null;
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId resID1 = response.getReservationId();
        Assert.assertNotNull((Object)resID1);
        LOG.info("Submit reservation response: " + resID1);
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId resID2 = response.getReservationId();
        Assert.assertNotNull((Object)resID2);
        LOG.info("Submit reservation response: " + resID2);
        try {
            response = clientService.submitReservation(request);
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertNotNull((Object)response);
        ReservationId resID3 = response.getReservationId();
        Assert.assertNotNull((Object)resID3);
        LOG.info("Submit reservation response: " + resID3);
        this.waitForReservationActivation(rm1, resID1, "dedicated");
        Plan plan = rm1.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, resID1, reservationDefinition);
        this.validateReservation(plan, resID2, reservationDefinition);
        this.validateReservation(plan, resID3, reservationDefinition);
        ResourceScheduler scheduler = rm1.getResourceScheduler();
        QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
        Assert.assertEquals((double)0.05, (double)resQ1.getCapacity(), (double)0.001f);
        QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
        Assert.assertEquals((double)0.05, (double)resQ2.getCapacity(), (double)0.001f);
        QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
        Assert.assertEquals((double)0.05, (double)resQ3.getCapacity(), (double)0.001f);
        this.explicitFailover();
        this.addNodeCapacityToPlan(rm2, 5120, 5);
        plan = rm2.getRMContext().getReservationSystem().getPlan("dedicated");
        this.validateReservation(plan, resID1, reservationDefinition);
        this.validateReservation(plan, resID3, reservationDefinition);
        scheduler = rm2.getResourceScheduler();
        resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false);
        Assert.assertEquals((float)0.33333334f, (float)resQ1.getCapacity(), (float)0.001f);
        resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false);
        Assert.assertEquals((float)0.33333334f, (float)resQ2.getCapacity(), (float)0.001f);
        resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false);
        Assert.assertEquals((float)0.33333334f, (float)resQ3.getCapacity(), (float)0.001f);
    }

    private void waitForReservationActivation(MockRM rm, ReservationId reservationId, String planName) {
        try {
            int attempts = 20;
            do {
                rm.getRMContext().getReservationSystem().synchronizePlan(planName, false);
                if (rm.getResourceScheduler().getQueueInfo(reservationId.toString(), false, false).getCapacity() > 0.0f) break;
                LOG.info("Waiting for reservation to be active");
                Thread.sleep(100L);
            } while (attempts-- > 0);
            if (attempts <= 0) {
                Assert.fail((String)"Exceeded attempts in waiting for reservation to be active");
            }
        }
        catch (Exception e) {
            Assert.fail((String)e.getMessage());
        }
    }
}

