/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.FSRecoveryHandler;
import com.datatorrent.stram.LaunchContainerRunnable;
import com.datatorrent.stram.ResourceRequestHandler;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StreamingContainerParent;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.security.StramWSFilterInitializer;
import com.datatorrent.stram.webapp.AppInfo;
import com.datatorrent.stram.webapp.StramWebApp;
import com.google.common.collect.Maps;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.bind.annotation.XmlElement;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingAppMasterService
extends CompositeService {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingAppMasterService.class);
    private static final long DELEGATION_KEY_UPDATE_INTERVAL = 86400000L;
    private static final long DELEGATION_TOKEN_MAX_LIFETIME = 0x3FFFFFFFFFFFFFFFL;
    private static final long DELEGATION_TOKEN_RENEW_INTERVAL = 0x3FFFFFFFFFFFFFFFL;
    private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 86400000L;
    private static final int NUMBER_MISSED_HEARTBEATS = 30;
    private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
    private static final long BLACKLIST_REMOVAL_TIME = 3600000L;
    private AMRMClient<AMRMClient.ContainerRequest> amRmClient;
    private NMClientAsync nmClient;
    private LogicalPlan dag;
    private final ApplicationAttemptId appAttemptID;
    private final String appMasterHostname = "";
    private String appMasterTrackingUrl = "";
    private boolean appDone = false;
    private final AtomicInteger numCompletedContainers = new AtomicInteger();
    private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap();
    private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap();
    private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>();
    private final AtomicInteger numFailedContainers = new AtomicInteger();
    private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue();
    private StreamingContainerParent heartbeatListener;
    private StreamingContainerManager dnmgr;
    private StramAppContext appContext;
    private final Clock clock = new SystemClock();
    private final long startTime = this.clock.getTime();
    private final ClusterAppStats stats = new ClusterAppStats();
    private StramDelegationTokenManager delegationTokenManager = null;
    private AppDataPushAgent appDataPushAgent;

    public StreamingAppMasterService(ApplicationAttemptId appAttemptID) {
        super(StreamingAppMasterService.class.getName());
        this.appAttemptID = appAttemptID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dumpOutDebugInfo() {
        LOG.info("Dump debug output");
        Map<String, String> envs = System.getenv();
        LOG.info("\nDumping System Env: begin");
        for (Map.Entry<String, String> env : envs.entrySet()) {
            LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
        }
        LOG.info("Dumping System Env: end");
        String cmd = "ls -al";
        Runtime run = Runtime.getRuntime();
        try {
            Process pr = run.exec(cmd);
            pr.waitFor();
            LOG.info("\nDumping files in local dir: begin");
            try (BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));){
                String line;
                while ((line = buf.readLine()) != null) {
                    LOG.info("System CWD content: " + line);
                }
                LOG.info("Dumping files in local dir: end");
            }
        }
        catch (IOException e) {
            LOG.debug("Exception", (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.info("Interrupted", (Throwable)e);
        }
        LOG.info("Classpath: {}", (Object)System.getProperty("java.class.path"));
        LOG.info("Config resources: {}", (Object)this.getConfig().toString());
        try {
            Configuration.dumpConfiguration((Configuration)this.getConfig(), (Writer)new PrintWriter(System.out));
        }
        catch (Exception e) {
            LOG.error("Error dumping configuration.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceInit(Configuration conf) throws Exception {
        LOG.info("Application master, appId=" + this.appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + this.appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId=" + this.appAttemptID.getAttemptId());
        try (FileInputStream fis = new FileInputStream("./dt-conf.ser");){
            this.dag = LogicalPlan.read(fis);
        }
        if (this.dag.isDebug()) {
            this.dumpOutDebugInfo();
        }
        this.dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, this.appAttemptID.getAttemptId());
        FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(this.dag.assertAppPath(), conf);
        this.dnmgr = StreamingContainerManager.getInstance(recoveryHandler, this.dag, true);
        this.dag = this.dnmgr.getLogicalPlan();
        this.appContext = new ClusterAppContextImpl(this.dag.getAttributes());
        Map codecs = (Map)this.dag.getAttributes().get(DAG.STRING_CODECS);
        StringCodecs.loadConverters(codecs);
        LOG.info("Starting application with {} operators in {} containers", (Object)this.dnmgr.getPhysicalPlan().getAllOperators().size(), (Object)this.dnmgr.getPhysicalPlan().getContainers().size());
        if (UserGroupInformation.isSecurityEnabled()) {
            this.delegationTokenManager = new StramDelegationTokenManager(86400000L, 0x3FFFFFFFFFFFFFFFL, 0x3FFFFFFFFFFFFFFFL, 86400000L);
        }
        this.nmClient = new NMClientAsyncImpl((NMClientAsync.CallbackHandler)new NMCallbackHandler());
        this.addService((Service)this.nmClient);
        this.amRmClient = AMRMClient.createAMRMClient();
        this.addService((Service)this.amRmClient);
        int rpcListenerCount = (Integer)this.dag.getValue(Context.DAGContext.HEARTBEAT_LISTENER_THREAD_COUNT);
        this.heartbeatListener = new StreamingContainerParent(((Object)((Object)this)).getClass().getName(), this.dnmgr, (SecretManager<? extends TokenIdentifier>)this.delegationTokenManager, rpcListenerCount);
        this.addService((Service)this.heartbeatListener);
        AutoMetric.Transport appDataPushTransport = (AutoMetric.Transport)this.dag.getValue(LogicalPlan.METRICS_TRANSPORT);
        if (appDataPushTransport != null) {
            this.appDataPushAgent = new AppDataPushAgent(this.dnmgr, this.appContext);
            this.addService((Service)this.appDataPushAgent);
        }
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
        if (UserGroupInformation.isSecurityEnabled()) {
            this.delegationTokenManager.startThreads();
        }
        InetSocketAddress connectAddress = NetUtils.getConnectAddress((InetSocketAddress)this.heartbeatListener.getAddress());
        URI connectUri = new URI("stram", null, connectAddress.getHostName(), connectAddress.getPort(), null, null, null);
        FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(this.dag.assertAppPath(), this.getConfig());
        recoveryHandler.writeConnectUri(connectUri.toString());
        try {
            Log.setLog(null);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        try {
            Configuration config = this.getConfig();
            if (UserGroupInformation.isSecurityEnabled()) {
                config = new Configuration(config);
                config.set("hadoop.http.filter.initializers", StramWSFilterInitializer.class.getCanonicalName());
            }
            WebApp webApp = WebApps.$for((String)"stram", StramAppContext.class, (Object)this.appContext, (String)"ws").with(config).start((WebApp)new StramWebApp(this.dnmgr));
            LOG.info("Started web service at port: " + webApp.port());
            this.appMasterTrackingUrl = NetUtils.getConnectAddress((InetSocketAddress)webApp.getListenerAddress()).getHostName() + ":" + webApp.port();
            LOG.info("Setting tracking URL to: " + this.appMasterTrackingUrl);
        }
        catch (Exception e) {
            LOG.error("Webapps failed to start. Ignoring for now:", (Throwable)e);
        }
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
        if (UserGroupInformation.isSecurityEnabled()) {
            this.delegationTokenManager.stopThreads();
        }
        if (this.nmClient != null) {
            this.nmClient.stop();
        }
        if (this.amRmClient != null) {
            this.amRmClient.stop();
        }
        if (this.dnmgr != null) {
            this.dnmgr.teardown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean run() throws Exception {
        boolean status = true;
        try {
            StreamingContainer.eventloop.start();
            this.execute();
        }
        finally {
            StreamingContainer.eventloop.stop();
        }
        return status;
    }

    private void execute() throws YarnException, IOException {
        LOG.info("Starting ApplicationMaster");
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        LOG.info("number of tokens: {}", (Object)credentials.getAllTokens().size());
        for (Token token : credentials.getAllTokens()) {
            LOG.debug("token: {}", (Object)token);
        }
        Configuration conf = this.getConfig();
        long tokenLifeTime = (long)(this.dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * (double)Math.min(this.dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), this.dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
        long expiryTime = System.currentTimeMillis() + tokenLifeTime;
        LOG.debug(" expiry token time {}", (Object)tokenLifeTime);
        String hdfsKeyTabFile = this.dag.getValue(LogicalPlan.KEY_TAB_FILE);
        RegisterApplicationMasterResponse response = this.amRmClient.registerApplicationMaster("", 0, this.appMasterTrackingUrl);
        int maxMem = response.getMaximumResourceCapability().getMemory();
        int maxVcores = response.getMaximumResourceCapability().getVirtualCores();
        int minMem = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
        int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
        LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", new Object[]{maxMem, minMem, maxVcores, minVcores});
        int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", 3);
        long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", 3600000L);
        HashMap requestedResources = Maps.newHashMap();
        int loopCounter = -1;
        ArrayList<ContainerId> releasedContainers = new ArrayList<ContainerId>();
        int numTotalContainers = 0;
        int numRequestedContainers = 0;
        int numReleasedContainers = 0;
        int nextRequestPriority = 0;
        ResourceRequestHandler resourceRequestor = new ResourceRequestHandler();
        YarnClient clientRMService = YarnClient.createYarnClient();
        try {
            clientRMService.init(conf);
            clientRMService.start();
            ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService, (String)this.dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), (String)this.dag.getAttributes().get(DAG.APPLICATION_ID));
            if (ar != null) {
                this.appDone = true;
                this.dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser());
                LOG.info("Forced shutdown due to {}", (Object)this.dnmgr.shutdownDiagnosticsMessage);
                this.finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
                return;
            }
            resourceRequestor.updateNodeReports(clientRMService.getNodeReports(new NodeState[0]));
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
        }
        finally {
            clientRMService.stop();
        }
        this.checkContainerStatus();
        FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
        InetSocketAddress rmAddress = conf.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
        while (!this.appDone) {
            Pair entry;
            Long timeDiff;
            Runnable r;
            ++loopCounter;
            if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
                String applicationId = this.appAttemptID.getApplicationId().toString();
                expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
            }
            while ((r = this.pendingTasks.poll()) != null) {
                r.run();
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.info("Sleep interrupted " + e.getMessage());
            }
            ArrayList<AMRMClient.ContainerRequest> containerRequests = new ArrayList<AMRMClient.ContainerRequest>();
            ArrayList<AMRMClient.ContainerRequest> removedContainerRequests = new ArrayList<AMRMClient.ContainerRequest>();
            if (!this.dnmgr.containerStartRequests.isEmpty()) {
                StreamingContainerAgent.ContainerStartRequest csr;
                while ((csr = this.dnmgr.containerStartRequests.poll()) != null) {
                    if (csr.container.getRequiredMemoryMB() > maxMem) {
                        LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", (Object)csr.container.getRequiredMemoryMB(), (Object)maxMem);
                        csr.container.setRequiredMemoryMB(maxMem);
                    }
                    if (csr.container.getRequiredMemoryMB() < minMem) {
                        csr.container.setRequiredMemoryMB(minMem);
                    }
                    if (csr.container.getRequiredVCores() > maxVcores) {
                        LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", (Object)csr.container.getRequiredVCores(), (Object)maxVcores);
                        csr.container.setRequiredVCores(maxVcores);
                    }
                    if (csr.container.getRequiredVCores() < minVcores) {
                        csr.container.setRequiredVCores(minVcores);
                    }
                    csr.container.setResourceRequestPriority(nextRequestPriority++);
                    AMRMClient.ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
                    MutablePair pair = new MutablePair((Object)loopCounter, (Object)cr);
                    requestedResources.put(csr, pair);
                    containerRequests.add(cr);
                }
            }
            if (!requestedResources.isEmpty()) {
                for (Map.Entry entry2 : requestedResources.entrySet()) {
                    if (loopCounter - (Integer)((MutablePair)entry2.getValue()).getKey() <= 30) continue;
                    StreamingContainerAgent.ContainerStartRequest csr = (StreamingContainerAgent.ContainerStartRequest)entry2.getKey();
                    removedContainerRequests.add((AMRMClient.ContainerRequest)((MutablePair)entry2.getValue()).getRight());
                    AMRMClient.ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
                    ((MutablePair)entry2.getValue()).setLeft((Object)loopCounter);
                    ((MutablePair)entry2.getValue()).setRight((Object)cr);
                    containerRequests.add(cr);
                }
            }
            long currentTime = System.currentTimeMillis();
            ArrayList blacklistRemovals = new ArrayList();
            Iterator it = this.blacklistedNodesQueueWithTimeStamp.iterator();
            while (it.hasNext() && (timeDiff = Long.valueOf(currentTime - (Long)(entry = (Pair)it.next()).getFirst())) > blacklistRemovalTime) {
                blacklistRemovals.addAll((Collection)entry.getSecond());
                it.remove();
            }
            if (!blacklistRemovals.isEmpty()) {
                this.amRmClient.updateBlacklist(null, blacklistRemovals);
            }
            numTotalContainers += containerRequests.size();
            numRequestedContainers += containerRequests.size();
            AllocateResponse amResp = this.sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
            if (amResp.getAMCommand() != null) {
                LOG.info(" statement executed:{}", (Object)amResp.getAMCommand());
                switch (amResp.getAMCommand()) {
                    case AM_RESYNC: 
                    case AM_SHUTDOWN: {
                        throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
                    }
                }
                throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
            }
            releasedContainers.clear();
            List newAllocatedContainers = amResp.getAllocatedContainers();
            numRequestedContainers -= newAllocatedContainers.size();
            long timestamp = System.currentTimeMillis();
            for (Container allocatedContainer : newAllocatedContainers) {
                StreamingContainerManager.ContainerResource resource;
                StreamingContainerAgent sca;
                LOG.info("Got new container., containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
                boolean alreadyAllocated = true;
                StreamingContainerAgent.ContainerStartRequest csr = null;
                for (Map.Entry entry3 : requestedResources.entrySet()) {
                    if (((StreamingContainerAgent.ContainerStartRequest)entry3.getKey()).container.getResourceRequestPriority() != allocatedContainer.getPriority().getPriority()) continue;
                    alreadyAllocated = false;
                    csr = (StreamingContainerAgent.ContainerStartRequest)entry3.getKey();
                    break;
                }
                if (alreadyAllocated) {
                    LOG.info("Releasing {} as resource with priority {} was already assigned", (Object)allocatedContainer.getId(), (Object)allocatedContainer.getPriority());
                    releasedContainers.add(allocatedContainer.getId());
                    ++numReleasedContainers;
                    ++numRequestedContainers;
                    continue;
                }
                if (csr != null) {
                    requestedResources.remove(csr);
                }
                if ((sca = this.dnmgr.assignContainer(resource = new StreamingContainerManager.ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress()), null)) == null) {
                    LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", (Object)allocatedContainer.getId());
                    releasedContainers.add(allocatedContainer.getId());
                    continue;
                }
                AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
                this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
                ByteBuffer tokens = null;
                if (UserGroupInformation.isSecurityEnabled()) {
                    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
                    Token<StramDelegationTokenIdentifier> delegationToken = this.allocateDelegationToken(ugi.getUserName(), this.heartbeatListener.getAddress());
                    allocatedContainerHolder.delegationToken = delegationToken;
                    tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
                }
                LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, this.nmClient, sca, tokens);
                launchContainer.run();
                StramEvent.StartContainerEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString());
                ev.setTimestamp(timestamp);
                this.dnmgr.recordEventAsync(ev);
            }
            resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
            List completedContainers = amResp.getCompletedContainersStatuses();
            ArrayList<String> blacklistAdditions = new ArrayList<String>();
            for (ContainerStatus containerStatus : completedContainers) {
                String hostname;
                int exitStatus;
                LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
                assert (containerStatus.getState() == ContainerState.COMPLETE);
                AllocatedContainer allocatedContainer = (AllocatedContainer)this.allocatedContainers.remove(containerStatus.getContainerId().toString());
                if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
                    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
                    this.delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
                }
                if (0 != (exitStatus = containerStatus.getExitStatus())) {
                    if (allocatedContainer != null) {
                        this.numFailedContainers.incrementAndGet();
                        if (exitStatus != 1) {
                            hostname = allocatedContainer.container.getNodeId().getHost();
                            int failedTimes = 1;
                            AtomicInteger failed = this.failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1));
                            if (failed != null) {
                                failedTimes = failed.incrementAndGet();
                            }
                            if (failedTimes >= maxConsecutiveContainerFailures) {
                                LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", (Object)hostname, (Object)failedTimes);
                                blacklistAdditions.add(hostname);
                            }
                        }
                    }
                    LOG.debug("Container {} failed or killed.", (Object)containerStatus.getContainerId());
                    this.dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
                } else {
                    this.numCompletedContainers.incrementAndGet();
                    LOG.info("Container completed successfully., containerId=" + containerStatus.getContainerId());
                    hostname = allocatedContainer.container.getNodeId().getHost();
                    AtomicInteger failedTimes = (AtomicInteger)this.failedContainersMap.get(hostname);
                    if (failedTimes != null) {
                        failedTimes.set(0);
                    }
                }
                String containerIdStr = containerStatus.getContainerId().toString();
                this.dnmgr.removeContainerAgent(containerIdStr);
                StramEvent.StopContainerEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus());
                ev.setReason(containerStatus.getDiagnostics());
                this.dnmgr.recordEventAsync(ev);
            }
            if (!blacklistAdditions.isEmpty()) {
                this.amRmClient.updateBlacklist(blacklistAdditions, null);
                this.blacklistedNodesQueueWithTimeStamp.add((Pair<Long, List<String>>)new Pair((Object)System.currentTimeMillis(), blacklistAdditions));
            }
            if (this.dnmgr.forcedShutdown) {
                LOG.info("Forced shutdown due to {}", (Object)this.dnmgr.shutdownDiagnosticsMessage);
                finalStatus = FinalApplicationStatus.FAILED;
                this.appDone = true;
            } else if (this.allocatedContainers.isEmpty() && numRequestedContainers == 0 && this.dnmgr.containerStartRequests.isEmpty()) {
                LOG.debug("Exiting as no more containers are allocated or requested");
                finalStatus = FinalApplicationStatus.SUCCEEDED;
                this.appDone = true;
            }
            LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + this.appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + this.numCompletedContainers + ", failed=" + this.numFailedContainers + ", currentAllocated=" + this.allocatedContainers.size());
            this.dnmgr.monitorHeartbeat();
        }
        this.finishApplication(finalStatus, numTotalContainers);
    }

    private void finishApplication(FinalApplicationStatus finalStatus, int numTotalContainers) throws YarnException, IOException {
        LOG.info("Application completed. Signalling finish to RM");
        FinishApplicationMasterRequest finishReq = (FinishApplicationMasterRequest)Records.newRecord(FinishApplicationMasterRequest.class);
        finishReq.setFinalApplicationStatus(finalStatus);
        if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
            String diagnostics = "Diagnostics., total=" + numTotalContainers + ", completed=" + this.numCompletedContainers.get() + ", allocated=" + this.allocatedContainers.size() + ", failed=" + this.numFailedContainers.get();
            if (!StringUtils.isEmpty((String)this.dnmgr.shutdownDiagnosticsMessage)) {
                diagnostics = diagnostics + "\n";
                diagnostics = diagnostics + this.dnmgr.shutdownDiagnosticsMessage;
            }
            finishReq.setDiagnostics(diagnostics);
        }
        LOG.info("diagnostics: " + finishReq.getDiagnostics());
        this.amRmClient.unregisterApplicationMaster(finishReq.getFinalApplicationStatus(), finishReq.getDiagnostics(), null);
    }

    private Token<StramDelegationTokenIdentifier> allocateDelegationToken(String username, InetSocketAddress address) {
        StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(username), new Text(""), new Text(""));
        String service = address.getAddress().getHostAddress() + ":" + address.getPort();
        Token stramToken = new Token((TokenIdentifier)identifier, (SecretManager)this.delegationTokenManager);
        stramToken.setService(new Text(service));
        return stramToken;
    }

    private void checkContainerStatus() {
        Collection<StreamingContainerAgent> containers = this.dnmgr.getContainerAgents();
        for (StreamingContainerAgent ca : containers) {
            ContainerId containerId = ConverterUtils.toContainerId((String)ca.container.getExternalId());
            NodeId nodeId = ConverterUtils.toNodeId((String)ca.container.host);
            org.apache.hadoop.yarn.api.records.Token containerToken = null;
            Resource resource = Resource.newInstance((int)ca.container.getAllocatedMemoryMB(), (int)ca.container.getAllocatedVCores());
            Priority priority = Priority.newInstance((int)ca.container.getResourceRequestPriority());
            Container yarnContainer = Container.newInstance((ContainerId)containerId, (NodeId)nodeId, (String)ca.container.nodeHttpAddress, (Resource)resource, (Priority)priority, containerToken);
            this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(yarnContainer));
            this.nmClient.getContainerStatusAsync(containerId, nodeId);
        }
    }

    private AllocateResponse sendContainerAskToRM(List<AMRMClient.ContainerRequest> containerRequests, List<AMRMClient.ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException {
        if (removedContainerRequests.size() > 0) {
            LOG.info(" Removing container request: " + removedContainerRequests);
            for (AMRMClient.ContainerRequest cr : removedContainerRequests) {
                LOG.info("Removed container: {}", (Object)cr.toString());
                this.amRmClient.removeContainerRequest(cr);
            }
        }
        if (containerRequests.size() > 0) {
            LOG.info("Asking RM for containers: " + containerRequests);
            for (AMRMClient.ContainerRequest cr : containerRequests) {
                LOG.info("Requested container: {}", (Object)cr.toString());
                this.amRmClient.addContainerRequest(cr);
            }
        }
        for (ContainerId containerId : releasedContainers) {
            LOG.info("Released container, id={}", (Object)containerId.getId());
            this.amRmClient.releaseAssignedContainer(containerId);
        }
        for (String containerIdStr : this.dnmgr.containerStopRequests.values()) {
            AllocatedContainer allocatedContainer = (AllocatedContainer)this.allocatedContainers.get(containerIdStr);
            if (allocatedContainer != null && !allocatedContainer.stopRequested) {
                this.nmClient.stopContainerAsync(allocatedContainer.container.getId(), allocatedContainer.container.getNodeId());
                LOG.info("Requested stop container {}", (Object)containerIdStr);
                allocatedContainer.stopRequested = true;
            }
            this.dnmgr.containerStopRequests.remove(containerIdStr);
        }
        return this.amRmClient.allocate(0.0f);
    }

    private class AllocatedContainer {
        private final Container container;
        private boolean stopRequested;
        private Token<StramDelegationTokenIdentifier> delegationToken;

        private AllocatedContainer(Container c) {
            this.container = c;
        }
    }

    private class NMCallbackHandler
    implements NMClientAsync.CallbackHandler {
        NMCallbackHandler() {
        }

        public void onContainerStopped(ContainerId containerId) {
            LOG.debug("Succeeded to stop Container {}", (Object)containerId);
        }

        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
            LOG.debug("Container Status: id={}, status={}", (Object)containerId, (Object)containerStatus);
            if (containerStatus.getState() != ContainerState.RUNNING) {
                this.recoverContainer(containerId);
            }
        }

        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
            LOG.debug("Succeeded to start Container {}", (Object)containerId);
        }

        public void onStartContainerError(ContainerId containerId, Throwable t) {
            LOG.error("Start container failed for: containerId={}", (Object)containerId, (Object)t);
        }

        public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
            LOG.error("Failed to query the status of {}", (Object)containerId, (Object)t);
            this.recoverContainer(containerId);
        }

        public void onStopContainerError(ContainerId containerId, Throwable t) {
            LOG.warn("Failed to stop container {}", (Object)containerId, (Object)t);
            this.recoverContainer(containerId);
        }

        private void recoverContainer(final ContainerId containerId) {
            StreamingAppMasterService.this.pendingTasks.add(new Runnable(){

                @Override
                public void run() {
                    StreamingAppMasterService.this.dnmgr.scheduleContainerRestart(containerId.toString());
                    StreamingAppMasterService.this.allocatedContainers.remove(containerId.toString());
                }
            });
        }
    }

    private class ClusterAppContextImpl
    extends BaseContext
    implements StramAppContext {
        private static final long serialVersionUID = 201309112304L;

        private ClusterAppContextImpl() {
            super(null, null);
        }

        ClusterAppContextImpl(Attribute.AttributeMap attributes) {
            super(attributes, null);
        }

        @Override
        public ApplicationId getApplicationID() {
            return StreamingAppMasterService.this.appAttemptID.getApplicationId();
        }

        @Override
        public ApplicationAttemptId getApplicationAttemptId() {
            return StreamingAppMasterService.this.appAttemptID;
        }

        @Override
        public String getApplicationName() {
            return (String)this.getValue(LogicalPlan.APPLICATION_NAME);
        }

        @Override
        public String getApplicationDocLink() {
            return (String)this.getValue(LogicalPlan.APPLICATION_DOC_LINK);
        }

        @Override
        public long getStartTime() {
            return StreamingAppMasterService.this.startTime;
        }

        @Override
        public String getApplicationPath() {
            return (String)this.getValue(LogicalPlan.APPLICATION_PATH);
        }

        @Override
        public CharSequence getUser() {
            return System.getenv(ApplicationConstants.Environment.USER.toString());
        }

        @Override
        public Clock getClock() {
            return StreamingAppMasterService.this.clock;
        }

        @Override
        public String getAppMasterTrackingUrl() {
            return StreamingAppMasterService.this.appMasterTrackingUrl;
        }

        @Override
        public ClusterAppStats getStats() {
            return StreamingAppMasterService.this.stats;
        }

        @Override
        public String getGatewayAddress() {
            return (String)this.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        }

        @Override
        public boolean isGatewayConnected() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.isGatewayConnected();
            }
            return false;
        }

        @Override
        public List<AppDataSource> getAppDataSources() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.getAppDataSources();
            }
            return null;
        }

        @Override
        public Map<String, Object> getMetrics() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.getLatestLogicalMetrics();
            }
            return null;
        }
    }

    protected class ClusterAppStats
    extends AppInfo.AppStats {
        protected ClusterAppStats() {
        }

        @Override
        @AutoMetric
        public int getAllocatedContainers() {
            return StreamingAppMasterService.this.allocatedContainers.size();
        }

        @Override
        @AutoMetric
        public int getPlannedContainers() {
            return StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers().size();
        }

        @Override
        @AutoMetric
        @XmlElement
        public int getFailedContainers() {
            return StreamingAppMasterService.this.numFailedContainers.get();
        }

        @Override
        @AutoMetric
        public int getNumOperators() {
            return StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().size();
        }

        @Override
        public long getCurrentWindowId() {
            long min = Long.MAX_VALUE;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                long windowId = entry.getValue().stats.currentWindowId.get();
                if (min <= windowId) continue;
                min = windowId;
            }
            return StreamingContainerManager.toWsWindowId(min == Long.MAX_VALUE ? 0L : min);
        }

        @Override
        public long getRecoveryWindowId() {
            return StreamingContainerManager.toWsWindowId(StreamingAppMasterService.this.dnmgr.getCommittedWindowId());
        }

        @Override
        @AutoMetric
        public long getTuplesProcessedPSMA() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                result += entry.getValue().stats.tuplesProcessedPSMA.get();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTotalTuplesProcessed() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                result += entry.getValue().stats.totalTuplesProcessed.get();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTuplesEmittedPSMA() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                result += entry.getValue().stats.tuplesEmittedPSMA.get();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTotalTuplesEmitted() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                result += entry.getValue().stats.totalTuplesEmitted.get();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTotalMemoryAllocated() {
            long result = 0L;
            for (PTContainer c : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                result += (long)c.getAllocatedMemoryMB();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getMemoryRequired() {
            long result = 0L;
            for (PTContainer c : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                if (c.getExternalId() != null && c.getState() != PTContainer.State.KILLED) continue;
                result += (long)c.getRequiredMemoryMB();
            }
            return result;
        }

        @Override
        @AutoMetric
        public int getTotalVCoresAllocated() {
            int result = 0;
            for (PTContainer c : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                result += c.getAllocatedVCores();
            }
            return result;
        }

        @Override
        @AutoMetric
        public int getVCoresRequired() {
            int result = 0;
            for (PTContainer c : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                if (c.getExternalId() != null && c.getState() != PTContainer.State.KILLED) continue;
                if (c.getRequiredVCores() == 0) {
                    ++result;
                    continue;
                }
                result += c.getRequiredVCores();
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTotalBufferServerReadBytesPSMA() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                for (Map.Entry<String, OperatorStatus.PortStatus> portEntry : entry.getValue().stats.inputPortStatusList.entrySet()) {
                    result = (long)((double)result + portEntry.getValue().bufferServerBytesPMSMA.getAvg() * 1000.0);
                }
            }
            return result;
        }

        @Override
        @AutoMetric
        public long getTotalBufferServerWriteBytesPSMA() {
            long result = 0L;
            for (Map.Entry<Integer, PTOperator> entry : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
                for (Map.Entry<String, OperatorStatus.PortStatus> portEntry : entry.getValue().stats.outputPortStatusList.entrySet()) {
                    result = (long)((double)result + portEntry.getValue().bufferServerBytesPMSMA.getAvg() * 1000.0);
                }
            }
            return result;
        }

        @Override
        public List<Integer> getCriticalPath() {
            StreamingContainerManager.CriticalPathInfo criticalPathInfo = StreamingAppMasterService.this.dnmgr.getCriticalPathInfo();
            return criticalPathInfo == null ? null : criticalPathInfo.path;
        }

        @Override
        @AutoMetric
        public long getLatency() {
            StreamingContainerManager.CriticalPathInfo criticalPathInfo = StreamingAppMasterService.this.dnmgr.getCriticalPathInfo();
            return criticalPathInfo == null ? 0L : criticalPathInfo.latency;
        }

        @Override
        public long getWindowStartMillis() {
            return StreamingAppMasterService.this.dnmgr.getWindowStartMillis();
        }
    }
}

