package com.datatorrent.stram;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.security.StramWSFilterInitializer;
import com.datatorrent.stram.webapp.AppInfo;
import com.datatorrent.stram.webapp.StramWebApp;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import com.google.common.collect.Maps;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.bind.annotation.XmlElement;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService.class */
public class StreamingAppMasterService extends CompositeService {
    private static final Logger LOG;
    private static final long DELEGATION_KEY_UPDATE_INTERVAL = 86400000;
    private static final long DELEGATION_TOKEN_MAX_LIFETIME = 4611686018427387903L;
    private static final long DELEGATION_TOKEN_RENEW_INTERVAL = 4611686018427387903L;
    private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 86400000;
    private static final int NUMBER_MISSED_HEARTBEATS = 30;
    private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
    private static final long BLACKLIST_REMOVAL_TIME = 3600000;
    private AMRMClient<AMRMClient.ContainerRequest> amRmClient;
    private NMClientAsync nmClient;
    private LogicalPlan dag;
    private final ApplicationAttemptId appAttemptID;
    private final String appMasterHostname = "";
    private String appMasterTrackingUrl;
    private boolean appDone;
    private final AtomicInteger numCompletedContainers;
    private final ConcurrentMap<String, AllocatedContainer> allocatedContainers;
    private final ConcurrentMap<String, AtomicInteger> failedContainersMap;
    private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp;
    private final AtomicInteger numFailedContainers;
    private final ConcurrentLinkedQueue<Runnable> pendingTasks;
    private StreamingContainerParent heartbeatListener;
    private StreamingContainerManager dnmgr;
    private StramAppContext appContext;
    private final Clock clock;
    private final long startTime;
    private final ClusterAppStats stats;
    private StramDelegationTokenManager delegationTokenManager;
    private AppDataPushAgent appDataPushAgent;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.datatorrent.stram.StreamingAppMasterService$1, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$AMCommand = new int[AMCommand.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$AMCommand[AMCommand.AM_RESYNC.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$AMCommand[AMCommand.AM_SHUTDOWN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService$AllocatedContainer.class */
    public class AllocatedContainer {
        private final Container container;
        private boolean stopRequested;
        private Token<StramDelegationTokenIdentifier> delegationToken;

        private AllocatedContainer(Container container) {
            this.container = container;
        }

        /* synthetic */ AllocatedContainer(StreamingAppMasterService streamingAppMasterService, Container container, AnonymousClass1 anonymousClass1) {
            this(container);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService$ClusterAppContextImpl.class */
    private class ClusterAppContextImpl extends BaseContext implements StramAppContext {
        private static final long serialVersionUID = 201309112304L;

        private ClusterAppContextImpl() {
            super(null, null);
        }

        ClusterAppContextImpl(Attribute.AttributeMap attributeMap) {
            super(attributeMap, null);
        }

        @Override // com.datatorrent.stram.StramAppContext
        public ApplicationId getApplicationID() {
            return StreamingAppMasterService.this.appAttemptID.getApplicationId();
        }

        @Override // com.datatorrent.stram.StramAppContext
        public ApplicationAttemptId getApplicationAttemptId() {
            return StreamingAppMasterService.this.appAttemptID;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public String getApplicationName() {
            return (String) getValue(LogicalPlan.APPLICATION_NAME);
        }

        @Override // com.datatorrent.stram.StramAppContext
        public String getApplicationDocLink() {
            return (String) getValue(LogicalPlan.APPLICATION_DOC_LINK);
        }

        @Override // com.datatorrent.stram.StramAppContext
        public long getStartTime() {
            return StreamingAppMasterService.this.startTime;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public String getApplicationPath() {
            return (String) getValue(LogicalPlan.APPLICATION_PATH);
        }

        @Override // com.datatorrent.stram.StramAppContext
        public CharSequence getUser() {
            return System.getenv(ApplicationConstants.Environment.USER.toString());
        }

        @Override // com.datatorrent.stram.StramAppContext
        public Clock getClock() {
            return StreamingAppMasterService.this.clock;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public String getAppMasterTrackingUrl() {
            return StreamingAppMasterService.this.appMasterTrackingUrl;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public ClusterAppStats getStats() {
            return StreamingAppMasterService.this.stats;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public String getGatewayAddress() {
            return (String) getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        }

        @Override // com.datatorrent.stram.StramAppContext
        public boolean isGatewayConnected() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.isGatewayConnected();
            }
            return false;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public List<AppDataSource> getAppDataSources() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.getAppDataSources();
            }
            return null;
        }

        @Override // com.datatorrent.stram.StramAppContext
        public Map<String, Object> getMetrics() {
            if (StreamingAppMasterService.this.dnmgr != null) {
                return StreamingAppMasterService.this.dnmgr.getLatestLogicalMetrics();
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService$ClusterAppStats.class */
    public class ClusterAppStats extends AppInfo.AppStats {
        protected ClusterAppStats() {
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public int getAllocatedContainers() {
            return StreamingAppMasterService.this.allocatedContainers.size();
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public int getPlannedContainers() {
            return StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers().size();
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        @XmlElement
        public int getFailedContainers() {
            return StreamingAppMasterService.this.numFailedContainers.get();
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public int getNumOperators() {
            return StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().size();
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        public long getCurrentWindowId() {
            long j = Long.MAX_VALUE;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                long j2 = it.next().getValue().stats.currentWindowId.get();
                if (j > j2) {
                    j = j2;
                }
            }
            return StreamingContainerManager.toWsWindowId(j == Long.MAX_VALUE ? 0L : j);
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        public long getRecoveryWindowId() {
            return StreamingContainerManager.toWsWindowId(StreamingAppMasterService.this.dnmgr.getCommittedWindowId());
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTuplesProcessedPSMA() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                j += it.next().getValue().stats.tuplesProcessedPSMA.get();
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTotalTuplesProcessed() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                j += it.next().getValue().stats.totalTuplesProcessed.get();
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTuplesEmittedPSMA() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                j += it.next().getValue().stats.tuplesEmittedPSMA.get();
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTotalTuplesEmitted() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                j += it.next().getValue().stats.totalTuplesEmitted.get();
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTotalMemoryAllocated() {
            long j = 0;
            while (StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers().iterator().hasNext()) {
                j += r0.next().getAllocatedMemoryMB();
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getMemoryRequired() {
            long j = 0;
            for (PTContainer pTContainer : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                if (pTContainer.getExternalId() == null || pTContainer.getState() == PTContainer.State.KILLED) {
                    j += pTContainer.getRequiredMemoryMB();
                }
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public int getTotalVCoresAllocated() {
            int i = 0;
            Iterator<PTContainer> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers().iterator();
            while (it.hasNext()) {
                i += it.next().getAllocatedVCores();
            }
            return i;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public int getVCoresRequired() {
            int i = 0;
            for (PTContainer pTContainer : StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getContainers()) {
                if (pTContainer.getExternalId() == null || pTContainer.getState() == PTContainer.State.KILLED) {
                    i = pTContainer.getRequiredVCores() == 0 ? i + 1 : i + pTContainer.getRequiredVCores();
                }
            }
            return i;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTotalBufferServerReadBytesPSMA() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it2 = it.next().getValue().stats.inputPortStatusList.entrySet().iterator();
                while (it2.hasNext()) {
                    j = (long) (j + (it2.next().getValue().bufferServerBytesPMSMA.getAvg() * 1000.0d));
                }
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getTotalBufferServerWriteBytesPSMA() {
            long j = 0;
            Iterator<Map.Entry<Integer, PTOperator>> it = StreamingAppMasterService.this.dnmgr.getPhysicalPlan().getAllOperators().entrySet().iterator();
            while (it.hasNext()) {
                Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it2 = it.next().getValue().stats.outputPortStatusList.entrySet().iterator();
                while (it2.hasNext()) {
                    j = (long) (j + (it2.next().getValue().bufferServerBytesPMSMA.getAvg() * 1000.0d));
                }
            }
            return j;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        public List<Integer> getCriticalPath() {
            StreamingContainerManager.CriticalPathInfo criticalPathInfo = StreamingAppMasterService.this.dnmgr.getCriticalPathInfo();
            if (criticalPathInfo == null) {
                return null;
            }
            return criticalPathInfo.path;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        @AutoMetric
        public long getLatency() {
            StreamingContainerManager.CriticalPathInfo criticalPathInfo = StreamingAppMasterService.this.dnmgr.getCriticalPathInfo();
            if (criticalPathInfo == null) {
                return 0L;
            }
            return criticalPathInfo.latency;
        }

        @Override // com.datatorrent.stram.webapp.AppInfo.AppStats
        public long getWindowStartMillis() {
            return StreamingAppMasterService.this.dnmgr.getWindowStartMillis();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingAppMasterService$NMCallbackHandler.class */
    private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
        NMCallbackHandler() {
        }

        public void onContainerStopped(ContainerId containerId) {
            StreamingAppMasterService.LOG.debug("Succeeded to stop Container {}", containerId);
        }

        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
            StreamingAppMasterService.LOG.debug("Container Status: id={}, status={}", containerId, containerStatus);
            if (containerStatus.getState() != ContainerState.RUNNING) {
                recoverContainer(containerId);
            }
        }

        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
            StreamingAppMasterService.LOG.debug("Succeeded to start Container {}", containerId);
        }

        public void onStartContainerError(ContainerId containerId, Throwable th) {
            StreamingAppMasterService.LOG.error("Start container failed for: containerId={}", containerId, th);
        }

        public void onGetContainerStatusError(ContainerId containerId, Throwable th) {
            StreamingAppMasterService.LOG.error("Failed to query the status of {}", containerId, th);
            recoverContainer(containerId);
        }

        public void onStopContainerError(ContainerId containerId, Throwable th) {
            StreamingAppMasterService.LOG.warn("Failed to stop container {}", containerId, th);
            recoverContainer(containerId);
        }

        private void recoverContainer(final ContainerId containerId) {
            StreamingAppMasterService.this.pendingTasks.add(new Runnable() { // from class: com.datatorrent.stram.StreamingAppMasterService.NMCallbackHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    StreamingAppMasterService.this.dnmgr.scheduleContainerRestart(containerId.toString());
                    StreamingAppMasterService.this.allocatedContainers.remove(containerId.toString());
                }
            });
        }
    }

    public StreamingAppMasterService(ApplicationAttemptId applicationAttemptId) {
        super(StreamingAppMasterService.class.getName());
        this.appMasterHostname = "";
        this.appMasterTrackingUrl = "";
        this.appDone = false;
        this.numCompletedContainers = new AtomicInteger();
        this.allocatedContainers = Maps.newConcurrentMap();
        this.failedContainersMap = Maps.newConcurrentMap();
        this.blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue();
        this.numFailedContainers = new AtomicInteger();
        this.pendingTasks = new ConcurrentLinkedQueue<>();
        this.clock = new SystemClock();
        this.startTime = this.clock.getTime();
        this.stats = new ClusterAppStats();
        this.delegationTokenManager = null;
        this.appAttemptID = applicationAttemptId;
    }

    public void dumpOutDebugInfo() {
        LOG.info("Dump debug output");
        Map<String, String> map = System.getenv();
        LOG.info("\nDumping System Env: begin");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            LOG.info("System env: key=" + entry.getKey() + ", val=" + entry.getValue());
        }
        LOG.info("Dumping System Env: end");
        try {
            Process exec = Runtime.getRuntime().exec("ls -al");
            exec.waitFor();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
            LOG.info("\nDumping files in local dir: begin");
            while (true) {
                try {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    } else {
                        LOG.info("System CWD content: " + readLine);
                    }
                } catch (Throwable th) {
                    bufferedReader.close();
                    throw th;
                }
            }
            LOG.info("Dumping files in local dir: end");
            bufferedReader.close();
        } catch (IOException e) {
            LOG.debug("Exception", e);
        } catch (InterruptedException e2) {
            LOG.info("Interrupted", e2);
        }
        LOG.info("Classpath: {}", System.getProperty("java.class.path"));
        LOG.info("Config resources: {}", getConfig().toString());
        try {
            Configuration.dumpConfiguration(getConfig(), new PrintWriter(System.out));
        } catch (Exception e3) {
            LOG.error("Error dumping configuration.", e3);
        }
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        LOG.info("Application master, appId=" + this.appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + this.appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId=" + this.appAttemptID.getAttemptId());
        FileInputStream fileInputStream = new FileInputStream("./dt-conf.ser");
        try {
            this.dag = LogicalPlan.read(fileInputStream);
            fileInputStream.close();
            if (this.dag.isDebug()) {
                dumpOutDebugInfo();
            }
            this.dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, Integer.valueOf(this.appAttemptID.getAttemptId()));
            this.dnmgr = StreamingContainerManager.getInstance(new FSRecoveryHandler(this.dag.assertAppPath(), configuration), this.dag, true);
            this.dag = this.dnmgr.getLogicalPlan();
            this.appContext = new ClusterAppContextImpl(this.dag.getAttributes());
            StringCodecs.loadConverters((Map) this.dag.getAttributes().get(DAG.STRING_CODECS));
            LOG.info("Starting application with {} operators in {} containers", Integer.valueOf(this.dnmgr.getPhysicalPlan().getAllOperators().size()), Integer.valueOf(this.dnmgr.getPhysicalPlan().getContainers().size()));
            if (UserGroupInformation.isSecurityEnabled()) {
                this.delegationTokenManager = new StramDelegationTokenManager(86400000L, 4611686018427387903L, 4611686018427387903L, 86400000L);
            }
            this.nmClient = new NMClientAsyncImpl(new NMCallbackHandler());
            addService(this.nmClient);
            this.amRmClient = AMRMClient.createAMRMClient();
            addService(this.amRmClient);
            this.heartbeatListener = new StreamingContainerParent(getClass().getName(), this.dnmgr, this.delegationTokenManager, ((Integer) this.dag.getValue(Context.DAGContext.HEARTBEAT_LISTENER_THREAD_COUNT)).intValue());
            addService(this.heartbeatListener);
            if (((AutoMetric.Transport) this.dag.getValue(LogicalPlan.METRICS_TRANSPORT)) != null) {
                this.appDataPushAgent = new AppDataPushAgent(this.dnmgr, this.appContext);
                addService(this.appDataPushAgent);
            }
            super.serviceInit(configuration);
        } catch (Throwable th) {
            fileInputStream.close();
            throw th;
        }
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
        if (UserGroupInformation.isSecurityEnabled()) {
            this.delegationTokenManager.startThreads();
        }
        InetSocketAddress connectAddress = NetUtils.getConnectAddress(this.heartbeatListener.getAddress());
        new FSRecoveryHandler(this.dag.assertAppPath(), getConfig()).writeConnectUri(new URI("stram", null, connectAddress.getHostName(), connectAddress.getPort(), null, null, null).toString());
        try {
            Log.setLog((org.mortbay.log.Logger) null);
        } catch (Throwable th) {
        }
        try {
            Configuration config = getConfig();
            if (UserGroupInformation.isSecurityEnabled()) {
                config = new Configuration(config);
                config.set("hadoop.http.filter.initializers", StramWSFilterInitializer.class.getCanonicalName());
            }
            WebApp start = WebApps.$for("stram", StramAppContext.class, this.appContext, "ws").with(config).start(new StramWebApp(this.dnmgr));
            LOG.info("Started web service at port: " + start.port());
            this.appMasterTrackingUrl = NetUtils.getConnectAddress(start.getListenerAddress()).getHostName() + ":" + start.port();
            LOG.info("Setting tracking URL to: " + this.appMasterTrackingUrl);
        } catch (Exception e) {
            LOG.error("Webapps failed to start. Ignoring for now:", e);
        }
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
        if (UserGroupInformation.isSecurityEnabled()) {
            this.delegationTokenManager.stopThreads();
        }
        if (this.nmClient != null) {
            this.nmClient.stop();
        }
        if (this.amRmClient != null) {
            this.amRmClient.stop();
        }
        if (this.dnmgr != null) {
            this.dnmgr.teardown();
        }
    }

    public boolean run() throws Exception {
        try {
            StreamingContainer.eventloop.start();
            execute();
            StreamingContainer.eventloop.stop();
            return true;
        } catch (Throwable th) {
            StreamingContainer.eventloop.stop();
            throw th;
        }
    }

    private void execute() throws YarnException, IOException {
        LOG.info("Starting ApplicationMaster");
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        LOG.info("number of tokens: {}", Integer.valueOf(credentials.getAllTokens().size()));
        Iterator it = credentials.getAllTokens().iterator();
        while (it.hasNext()) {
            LOG.debug("token: {}", (Token) it.next());
        }
        Configuration config = getConfig();
        long doubleValue = (long) (((Double) this.dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR)).doubleValue() * Math.min(((Long) this.dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)).longValue(), ((Long) this.dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)).longValue()));
        long currentTimeMillis = System.currentTimeMillis() + doubleValue;
        LOG.debug(" expiry token time {}", Long.valueOf(doubleValue));
        String str = (String) this.dag.getValue(LogicalPlan.KEY_TAB_FILE);
        RegisterApplicationMasterResponse registerApplicationMaster = this.amRmClient.registerApplicationMaster("", 0, this.appMasterTrackingUrl);
        int memory = registerApplicationMaster.getMaximumResourceCapability().getMemory();
        int virtualCores = registerApplicationMaster.getMaximumResourceCapability().getVirtualCores();
        int i = config.getInt("yarn.scheduler.minimum-allocation-mb", 0);
        int i2 = config.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
        LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", new Object[]{Integer.valueOf(memory), Integer.valueOf(i), Integer.valueOf(virtualCores), Integer.valueOf(i2)});
        int i3 = config.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", 3);
        long j = config.getLong("BLACKLIST_REMOVAL_TIME", 3600000L);
        HashMap newHashMap = Maps.newHashMap();
        int i4 = -1;
        List<ContainerId> arrayList = new ArrayList<>();
        int i5 = 0;
        int i6 = 0;
        int i7 = 0;
        int i8 = 0;
        ResourceRequestHandler resourceRequestHandler = new ResourceRequestHandler();
        YarnClient createYarnClient = YarnClient.createYarnClient();
        try {
            try {
                createYarnClient.init(config);
                createYarnClient.start();
                ApplicationReport startedAppInstanceByName = StramClientUtils.getStartedAppInstanceByName(createYarnClient, (String) this.dag.getAttributes().get(DAG.APPLICATION_NAME), UserGroupInformation.getLoginUser().getUserName(), (String) this.dag.getAttributes().get(DAG.APPLICATION_ID));
                if (startedAppInstanceByName != null) {
                    this.appDone = true;
                    this.dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", startedAppInstanceByName.getApplicationId().toString(), startedAppInstanceByName.getName(), startedAppInstanceByName.getUser());
                    LOG.info("Forced shutdown due to {}", this.dnmgr.shutdownDiagnosticsMessage);
                    finishApplication(FinalApplicationStatus.FAILED, 0);
                    createYarnClient.stop();
                    return;
                }
                resourceRequestHandler.updateNodeReports(createYarnClient.getNodeReports(new NodeState[0]));
                createYarnClient.stop();
                checkContainerStatus();
                FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.SUCCEEDED;
                InetSocketAddress socketAddr = config.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
                while (!this.appDone) {
                    i4++;
                    if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= currentTimeMillis && str != null) {
                        currentTimeMillis = StramUserLogin.refreshTokens(doubleValue, FileUtils.getTempDirectoryPath(), this.appAttemptID.getApplicationId().toString(), config, str, credentials, socketAddr, true);
                    }
                    while (true) {
                        Runnable poll = this.pendingTasks.poll();
                        if (poll != null) {
                            poll.run();
                        } else {
                            try {
                                break;
                            } catch (InterruptedException e) {
                                LOG.info("Sleep interrupted " + e.getMessage());
                            }
                        }
                    }
                    Thread.sleep(1000L);
                    List<AMRMClient.ContainerRequest> arrayList2 = new ArrayList<>();
                    ArrayList arrayList3 = new ArrayList();
                    if (!this.dnmgr.containerStartRequests.isEmpty()) {
                        while (true) {
                            StreamingContainerAgent.ContainerStartRequest poll2 = this.dnmgr.containerStartRequests.poll();
                            if (poll2 == null) {
                                break;
                            }
                            if (poll2.container.getRequiredMemoryMB() > memory) {
                                LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", Integer.valueOf(poll2.container.getRequiredMemoryMB()), Integer.valueOf(memory));
                                poll2.container.setRequiredMemoryMB(memory);
                            }
                            if (poll2.container.getRequiredMemoryMB() < i) {
                                poll2.container.setRequiredMemoryMB(i);
                            }
                            if (poll2.container.getRequiredVCores() > virtualCores) {
                                LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", Integer.valueOf(poll2.container.getRequiredVCores()), Integer.valueOf(virtualCores));
                                poll2.container.setRequiredVCores(virtualCores);
                            }
                            if (poll2.container.getRequiredVCores() < i2) {
                                poll2.container.setRequiredVCores(i2);
                            }
                            int i9 = i8;
                            i8++;
                            poll2.container.setResourceRequestPriority(i9);
                            AMRMClient.ContainerRequest createContainerRequest = resourceRequestHandler.createContainerRequest(poll2, true);
                            newHashMap.put(poll2, new MutablePair(Integer.valueOf(i4), createContainerRequest));
                            arrayList2.add(createContainerRequest);
                        }
                    }
                    if (!newHashMap.isEmpty()) {
                        for (Map.Entry entry : newHashMap.entrySet()) {
                            if (i4 - ((Integer) ((MutablePair) entry.getValue()).getKey()).intValue() > NUMBER_MISSED_HEARTBEATS) {
                                StreamingContainerAgent.ContainerStartRequest containerStartRequest = (StreamingContainerAgent.ContainerStartRequest) entry.getKey();
                                arrayList3.add(((MutablePair) entry.getValue()).getRight());
                                AMRMClient.ContainerRequest createContainerRequest2 = resourceRequestHandler.createContainerRequest(containerStartRequest, false);
                                ((MutablePair) entry.getValue()).setLeft(Integer.valueOf(i4));
                                ((MutablePair) entry.getValue()).setRight(createContainerRequest2);
                                arrayList2.add(createContainerRequest2);
                            }
                        }
                    }
                    long currentTimeMillis2 = System.currentTimeMillis();
                    ArrayList arrayList4 = new ArrayList();
                    Iterator<Pair<Long, List<String>>> it2 = this.blacklistedNodesQueueWithTimeStamp.iterator();
                    while (it2.hasNext()) {
                        Pair<Long, List<String>> next = it2.next();
                        if (Long.valueOf(currentTimeMillis2 - ((Long) next.getFirst()).longValue()).longValue() <= j) {
                            break;
                        }
                        arrayList4.addAll((Collection) next.getSecond());
                        it2.remove();
                    }
                    if (!arrayList4.isEmpty()) {
                        this.amRmClient.updateBlacklist((List) null, arrayList4);
                    }
                    i5 += arrayList2.size();
                    int size = i6 + arrayList2.size();
                    AllocateResponse sendContainerAskToRM = sendContainerAskToRM(arrayList2, arrayList3, arrayList);
                    if (sendContainerAskToRM.getAMCommand() != null) {
                        LOG.info(" statement executed:{}", sendContainerAskToRM.getAMCommand());
                        switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$yarn$api$records$AMCommand[sendContainerAskToRM.getAMCommand().ordinal()]) {
                            case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                            case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                                throw new YarnRuntimeException("Received the " + sendContainerAskToRM.getAMCommand() + " command from RM");
                            default:
                                throw new YarnRuntimeException("Received the " + sendContainerAskToRM.getAMCommand() + " command from RM");
                        }
                    }
                    arrayList.clear();
                    List<Container> allocatedContainers = sendContainerAskToRM.getAllocatedContainers();
                    i6 = size - allocatedContainers.size();
                    long currentTimeMillis3 = System.currentTimeMillis();
                    for (Container container : allocatedContainers) {
                        LOG.info("Got new container., containerId=" + container.getId() + ", containerNode=" + container.getNodeId() + ", containerNodeURI=" + container.getNodeHttpAddress() + ", containerResourceMemory" + container.getResource().getMemory() + ", priority" + container.getPriority());
                        boolean z = true;
                        StreamingContainerAgent.ContainerStartRequest containerStartRequest2 = null;
                        Iterator it3 = newHashMap.entrySet().iterator();
                        while (true) {
                            if (!it3.hasNext()) {
                                break;
                            }
                            Map.Entry entry2 = (Map.Entry) it3.next();
                            if (((StreamingContainerAgent.ContainerStartRequest) entry2.getKey()).container.getResourceRequestPriority() == container.getPriority().getPriority()) {
                                z = false;
                                containerStartRequest2 = (StreamingContainerAgent.ContainerStartRequest) entry2.getKey();
                                break;
                            }
                        }
                        if (z) {
                            LOG.info("Releasing {} as resource with priority {} was already assigned", container.getId(), container.getPriority());
                            arrayList.add(container.getId());
                            i7++;
                            i6++;
                        } else {
                            if (containerStartRequest2 != null) {
                                newHashMap.remove(containerStartRequest2);
                            }
                            StreamingContainerAgent assignContainer = this.dnmgr.assignContainer(new StreamingContainerManager.ContainerResource(container.getPriority().getPriority(), container.getId().toString(), container.getNodeId().toString(), container.getResource().getMemory(), container.getResource().getVirtualCores(), container.getNodeHttpAddress()), null);
                            if (assignContainer == null) {
                                LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", container.getId());
                                arrayList.add(container.getId());
                            } else {
                                AllocatedContainer allocatedContainer = new AllocatedContainer(this, container, null);
                                this.allocatedContainers.put(container.getId().toString(), allocatedContainer);
                                ByteBuffer byteBuffer = null;
                                if (UserGroupInformation.isSecurityEnabled()) {
                                    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                                    Token<StramDelegationTokenIdentifier> allocateDelegationToken = allocateDelegationToken(loginUser.getUserName(), this.heartbeatListener.getAddress());
                                    allocatedContainer.delegationToken = allocateDelegationToken;
                                    byteBuffer = LaunchContainerRunnable.getTokens(loginUser, allocateDelegationToken);
                                }
                                new LaunchContainerRunnable(container, this.nmClient, assignContainer, byteBuffer).run();
                                StramEvent.StartContainerEvent startContainerEvent = new StramEvent.StartContainerEvent(container.getId().toString(), container.getNodeId().toString());
                                startContainerEvent.setTimestamp(currentTimeMillis3);
                                this.dnmgr.recordEventAsync(startContainerEvent);
                            }
                        }
                    }
                    resourceRequestHandler.updateNodeReports(sendContainerAskToRM.getUpdatedNodes());
                    List<ContainerStatus> completedContainersStatuses = sendContainerAskToRM.getCompletedContainersStatuses();
                    ArrayList arrayList5 = new ArrayList();
                    for (ContainerStatus containerStatus : completedContainersStatuses) {
                        LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
                        if (!$assertionsDisabled && containerStatus.getState() != ContainerState.COMPLETE) {
                            throw new AssertionError();
                        }
                        AllocatedContainer remove = this.allocatedContainers.remove(containerStatus.getContainerId().toString());
                        if (remove != null && remove.delegationToken != null) {
                            this.delegationTokenManager.cancelToken(remove.delegationToken, UserGroupInformation.getLoginUser().getUserName());
                        }
                        int exitStatus = containerStatus.getExitStatus();
                        if (0 != exitStatus) {
                            if (remove != null) {
                                this.numFailedContainers.incrementAndGet();
                                if (exitStatus != 1) {
                                    String host = remove.container.getNodeId().getHost();
                                    AtomicInteger putIfAbsent = this.failedContainersMap.putIfAbsent(host, new AtomicInteger(1));
                                    int incrementAndGet = putIfAbsent != null ? putIfAbsent.incrementAndGet() : 1;
                                    if (incrementAndGet >= i3) {
                                        LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", host, Integer.valueOf(incrementAndGet));
                                        arrayList5.add(host);
                                    }
                                }
                            }
                            LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
                            this.dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
                        } else {
                            this.numCompletedContainers.incrementAndGet();
                            LOG.info("Container completed successfully., containerId=" + containerStatus.getContainerId());
                            AtomicInteger atomicInteger = this.failedContainersMap.get(remove.container.getNodeId().getHost());
                            if (atomicInteger != null) {
                                atomicInteger.set(0);
                            }
                        }
                        String containerId = containerStatus.getContainerId().toString();
                        this.dnmgr.removeContainerAgent(containerId);
                        StramEvent.StopContainerEvent stopContainerEvent = new StramEvent.StopContainerEvent(containerId, containerStatus.getExitStatus());
                        stopContainerEvent.setReason(containerStatus.getDiagnostics());
                        this.dnmgr.recordEventAsync(stopContainerEvent);
                    }
                    if (!arrayList5.isEmpty()) {
                        this.amRmClient.updateBlacklist(arrayList5, (List) null);
                        this.blacklistedNodesQueueWithTimeStamp.add(new Pair<>(Long.valueOf(System.currentTimeMillis()), arrayList5));
                    }
                    if (this.dnmgr.forcedShutdown) {
                        LOG.info("Forced shutdown due to {}", this.dnmgr.shutdownDiagnosticsMessage);
                        finalApplicationStatus = FinalApplicationStatus.FAILED;
                        this.appDone = true;
                    } else if (this.allocatedContainers.isEmpty() && i6 == 0 && this.dnmgr.containerStartRequests.isEmpty()) {
                        LOG.debug("Exiting as no more containers are allocated or requested");
                        finalApplicationStatus = FinalApplicationStatus.SUCCEEDED;
                        this.appDone = true;
                    }
                    LOG.debug("Current application state: loop=" + i4 + ", appDone=" + this.appDone + ", total=" + i5 + ", requested=" + i6 + ", released=" + i7 + ", completed=" + this.numCompletedContainers + ", failed=" + this.numFailedContainers + ", currentAllocated=" + this.allocatedContainers.size());
                    this.dnmgr.monitorHeartbeat();
                }
                finishApplication(finalApplicationStatus, i5);
            } catch (Exception e2) {
                throw new RuntimeException("Failed to retrieve cluster nodes report.", e2);
            }
        } catch (Throwable th) {
            createYarnClient.stop();
            throw th;
        }
    }

    private void finishApplication(FinalApplicationStatus finalApplicationStatus, int i) throws YarnException, IOException {
        LOG.info("Application completed. Signalling finish to RM");
        FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
        finishApplicationMasterRequest.setFinalApplicationStatus(finalApplicationStatus);
        if (finalApplicationStatus != FinalApplicationStatus.SUCCEEDED) {
            String str = "Diagnostics., total=" + i + ", completed=" + this.numCompletedContainers.get() + ", allocated=" + this.allocatedContainers.size() + ", failed=" + this.numFailedContainers.get();
            if (!StringUtils.isEmpty(this.dnmgr.shutdownDiagnosticsMessage)) {
                str = (str + "\n") + this.dnmgr.shutdownDiagnosticsMessage;
            }
            finishApplicationMasterRequest.setDiagnostics(str);
        }
        LOG.info("diagnostics: " + finishApplicationMasterRequest.getDiagnostics());
        this.amRmClient.unregisterApplicationMaster(finishApplicationMasterRequest.getFinalApplicationStatus(), finishApplicationMasterRequest.getDiagnostics(), (String) null);
    }

    private Token<StramDelegationTokenIdentifier> allocateDelegationToken(String str, InetSocketAddress inetSocketAddress) {
        StramDelegationTokenIdentifier stramDelegationTokenIdentifier = new StramDelegationTokenIdentifier(new Text(str), new Text(""), new Text(""));
        String str2 = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
        Token<StramDelegationTokenIdentifier> token = new Token<>(stramDelegationTokenIdentifier, this.delegationTokenManager);
        token.setService(new Text(str2));
        return token;
    }

    private void checkContainerStatus() {
        for (StreamingContainerAgent streamingContainerAgent : this.dnmgr.getContainerAgents()) {
            ContainerId containerId = ConverterUtils.toContainerId(streamingContainerAgent.container.getExternalId());
            NodeId nodeId = ConverterUtils.toNodeId(streamingContainerAgent.container.host);
            this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(this, Container.newInstance(containerId, nodeId, streamingContainerAgent.container.nodeHttpAddress, Resource.newInstance(streamingContainerAgent.container.getAllocatedMemoryMB(), streamingContainerAgent.container.getAllocatedVCores()), Priority.newInstance(streamingContainerAgent.container.getResourceRequestPriority()), (org.apache.hadoop.yarn.api.records.Token) null), null));
            this.nmClient.getContainerStatusAsync(containerId, nodeId);
        }
    }

    private AllocateResponse sendContainerAskToRM(List<AMRMClient.ContainerRequest> list, List<AMRMClient.ContainerRequest> list2, List<ContainerId> list3) throws YarnException, IOException {
        if (list2.size() > 0) {
            LOG.info(" Removing container request: " + list2);
            for (AMRMClient.ContainerRequest containerRequest : list2) {
                LOG.info("Removed container: {}", containerRequest.toString());
                this.amRmClient.removeContainerRequest(containerRequest);
            }
        }
        if (list.size() > 0) {
            LOG.info("Asking RM for containers: " + list);
            for (AMRMClient.ContainerRequest containerRequest2 : list) {
                LOG.info("Requested container: {}", containerRequest2.toString());
                this.amRmClient.addContainerRequest(containerRequest2);
            }
        }
        for (ContainerId containerId : list3) {
            LOG.info("Released container, id={}", Integer.valueOf(containerId.getId()));
            this.amRmClient.releaseAssignedContainer(containerId);
        }
        for (String str : this.dnmgr.containerStopRequests.values()) {
            AllocatedContainer allocatedContainer = this.allocatedContainers.get(str);
            if (allocatedContainer != null && !allocatedContainer.stopRequested) {
                this.nmClient.stopContainerAsync(allocatedContainer.container.getId(), allocatedContainer.container.getNodeId());
                LOG.info("Requested stop container {}", str);
                allocatedContainer.stopRequested = true;
            }
            this.dnmgr.containerStopRequests.remove(str);
        }
        return this.amRmClient.allocate(0.0f);
    }

    static {
        $assertionsDisabled = !StreamingAppMasterService.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(StreamingAppMasterService.class);
    }
}
