/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceRequestHandler.class);
    private final Map<String, NodeReport> nodeReportMap = Maps.newHashMap();
    private final Map<Set<PTOperator>, String> nodeLocalMapping = Maps.newHashMap();
    private final Map<String, String> nodeToRack = Maps.newHashMap();

    public AMRMClient.ContainerRequest createContainerRequest(StreamingContainerAgent.ContainerStartRequest csr, boolean first) {
        int priority = csr.container.getResourceRequestPriority();
        String[] nodes = null;
        String[] racks = null;
        String host = this.getHost(csr, first);
        Resource capability = (Resource)Records.newRecord(Resource.class);
        capability.setMemory(csr.container.getRequiredMemoryMB());
        capability.setVirtualCores(csr.container.getRequiredVCores());
        if (host != null) {
            nodes = new String[]{host};
            return new AMRMClient.ContainerRequest(capability, nodes, racks, Priority.newInstance((int)priority), false);
        }
        return new AMRMClient.ContainerRequest(capability, nodes, racks, Priority.newInstance((int)priority));
    }

    public void clearNodeMapping() {
        this.nodeLocalMapping.clear();
    }

    public void updateNodeReports(List<NodeReport> nodeReports) {
        for (NodeReport nr : nodeReports) {
            StringBuilder sb = new StringBuilder();
            sb.append("rackName=").append(nr.getRackName()).append(",nodeid=").append(nr.getNodeId()).append(",numContainers=").append(nr.getNumContainers()).append(",capability=").append(nr.getCapability()).append("used=").append(nr.getUsed()).append("state=").append(nr.getNodeState());
            LOG.info("Node report: " + sb);
            this.nodeReportMap.put(nr.getNodeId().getHost(), nr);
            this.nodeToRack.put(nr.getNodeId().getHost(), nr.getRackName());
        }
    }

    public String getHost(StreamingContainerAgent.ContainerStartRequest csr, boolean first) {
        Set<PTOperator> nodeLocalSet;
        PTOperator.HostOperatorSet grpObj;
        String host = null;
        PTContainer c = csr.container;
        if (first) {
            for (PTOperator oper : c.getOperators()) {
                grpObj = oper.getNodeLocalOperators();
                host = this.nodeLocalMapping.get(grpObj.getOperatorSet());
                if (host != null) {
                    return host;
                }
                if (grpObj.getHost() == null) continue;
                host = grpObj.getHost();
                break;
            }
            if (host != null && this.nodeReportMap.get(host) != null) {
                for (PTOperator oper : c.getOperators()) {
                    grpObj = oper.getNodeLocalOperators();
                    nodeLocalSet = grpObj.getOperatorSet();
                    NodeReport report = this.nodeReportMap.get(host);
                    int aggrMemory = c.getRequiredMemoryMB();
                    int vCores = c.getRequiredVCores();
                    HashSet containers = Sets.newHashSet();
                    containers.add(c);
                    for (PTOperator nodeLocalOper : nodeLocalSet) {
                        if (containers.contains(nodeLocalOper.getContainer())) continue;
                        aggrMemory += nodeLocalOper.getContainer().getRequiredMemoryMB();
                        vCores += nodeLocalOper.getContainer().getRequiredVCores();
                        containers.add(nodeLocalOper.getContainer());
                    }
                    int n = report.getCapability().getMemory() - report.getUsed().getMemory();
                    int vCoresAvailable = report.getCapability().getVirtualCores() - report.getUsed().getVirtualCores();
                    if (n < aggrMemory || vCoresAvailable < vCores) continue;
                    this.nodeLocalMapping.put(nodeLocalSet, host);
                    return host;
                }
            }
        }
        host = null;
        for (PTOperator oper : c.getOperators()) {
            grpObj = oper.getNodeLocalOperators();
            nodeLocalSet = grpObj.getOperatorSet();
            if (nodeLocalSet.size() <= 1) continue;
            LOG.debug("Finding new host for {}", nodeLocalSet);
            int aggrMemory = c.getRequiredMemoryMB();
            int vCores = c.getRequiredVCores();
            HashSet containers = Sets.newHashSet();
            containers.add(c);
            for (PTOperator pTOperator : nodeLocalSet) {
                if (containers.contains(pTOperator.getContainer())) continue;
                aggrMemory += pTOperator.getContainer().getRequiredMemoryMB();
                vCores += pTOperator.getContainer().getRequiredVCores();
                containers.add(pTOperator.getContainer());
            }
            for (Map.Entry entry : this.nodeReportMap.entrySet()) {
                int memAvailable = ((NodeReport)entry.getValue()).getCapability().getMemory() - ((NodeReport)entry.getValue()).getUsed().getMemory();
                int vCoresAvailable = ((NodeReport)entry.getValue()).getCapability().getVirtualCores() - ((NodeReport)entry.getValue()).getUsed().getVirtualCores();
                if (memAvailable < aggrMemory || vCoresAvailable < vCores) continue;
                host = (String)entry.getKey();
                grpObj.setHost(host);
                this.nodeLocalMapping.put(nodeLocalSet, host);
                return host;
            }
        }
        return host;
    }
}

