/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.uam;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class UnmanagedApplicationManager {
    private static final Logger LOG = LoggerFactory.getLogger(UnmanagedApplicationManager.class);
    private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000L;
    public static final String APP_NAME = "UnmanagedAM";
    private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
    private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
    private AMRequestHandlerThread handlerThread;
    private ApplicationMasterProtocol rmProxy;
    private ApplicationId applicationId;
    private ApplicationAttemptId attemptId;
    private String submitter;
    private String appNameSuffix;
    private Configuration conf;
    private String queueName;
    private UserGroupInformation userUgi;
    private RegisterApplicationMasterRequest registerRequest;
    private int lastResponseId;
    private ApplicationClientProtocol rmClient;
    private long asyncApiPollIntervalMillis;
    private RecordFactory recordFactory;

    public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix) {
        Preconditions.checkNotNull((Object)conf, (Object)"Configuration cannot be null");
        Preconditions.checkNotNull((Object)appId, (Object)"ApplicationId cannot be null");
        Preconditions.checkNotNull((Object)submitter, (Object)"App submitter cannot be null");
        this.conf = conf;
        this.applicationId = appId;
        this.queueName = queueName;
        this.submitter = submitter;
        this.appNameSuffix = appNameSuffix;
        this.handlerThread = new AMRequestHandlerThread();
        this.requestQueue = new LinkedBlockingQueue<AsyncAllocateRequestInfo>();
        this.rmProxy = null;
        this.registerRequest = null;
        this.recordFactory = RecordFactoryProvider.getRecordFactory((Configuration)conf);
        this.asyncApiPollIntervalMillis = conf.getLong("yarn.client.application-client-protocol.poll-interval-ms", 200L);
    }

    public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException {
        this.registerRequest = request;
        UnmanagedAMIdentifier identifier = this.initializeUnmanagedAM(this.applicationId);
        try {
            this.userUgi = UserGroupInformation.createProxyUser((String)identifier.getAttemptId().toString(), (UserGroupInformation)UserGroupInformation.getCurrentUser());
        }
        catch (IOException e) {
            LOG.error("Exception while trying to get current user", (Throwable)e);
            throw new YarnRuntimeException((Throwable)e);
        }
        this.rmProxy = this.createRMProxy(ApplicationMasterProtocol.class, this.conf, this.userUgi, identifier.getToken());
        LOG.info("Registering the Unmanaged application master {}", (Object)this.attemptId);
        RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest);
        this.handlerThread.setUncaughtExceptionHandler(new HeartBeatThreadUncaughtExceptionHandler());
        this.handlerThread.setDaemon(true);
        this.handlerThread.start();
        this.lastResponseId = 0;
        return response;
    }

    public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException {
        this.handlerThread.shutdown();
        if (this.rmProxy == null) {
            if (this.registerRequest != null) {
                LOG.warn("Unmanaged AM still not successfully launched/registered yet. Stopping the UAM client thread anyways.");
                return FinishApplicationMasterResponse.newInstance((boolean)false);
            }
            throw new YarnException("finishApplicationMaster should not be called before createAndRegister");
        }
        return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, this.registerRequest, this.attemptId);
    }

    public KillApplicationResponse forceKillApplication() throws IOException, YarnException {
        KillApplicationRequest request = KillApplicationRequest.newInstance((ApplicationId)this.attemptId.getApplicationId());
        this.handlerThread.shutdown();
        if (this.rmClient == null) {
            this.rmClient = this.createRMProxy(ApplicationClientProtocol.class, this.conf, UserGroupInformation.createRemoteUser((String)this.submitter), null);
        }
        return this.rmClient.forceKillApplication(request);
    }

    public void allocateAsync(AllocateRequest request, AsyncCallback<AllocateResponse> callback) throws YarnException {
        try {
            this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
        }
        catch (InterruptedException ex) {
            LOG.debug("Interrupted while waiting to put on response queue", (Throwable)ex);
        }
        if (this.rmProxy == null) {
            if (this.registerRequest != null) {
                LOG.info("Unmanaged AM still not successfully launched/registered yet. Saving the allocate request and send later.");
            } else {
                throw new YarnException("AllocateAsync should not be called before createAndRegister");
            }
        }
    }

    public ApplicationAttemptId getAttemptId() {
        return this.attemptId;
    }

    protected <T> T createRMProxy(Class<T> protocol, Configuration config, UserGroupInformation user, Token<AMRMTokenIdentifier> token) throws IOException {
        return AMRMClientUtils.createRMProxy(config, protocol, user, token);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) throws IOException, YarnException {
        try {
            UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser((String)this.submitter);
            this.rmClient = this.createRMProxy(ApplicationClientProtocol.class, this.conf, appSubmitter, null);
            this.submitUnmanagedApp(appId);
            ApplicationAttemptReport attemptReport = this.monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED);
            this.attemptId = attemptReport.getApplicationAttemptId();
            UnmanagedAMIdentifier unmanagedAMIdentifier = this.getUAMIdentifier();
            return unmanagedAMIdentifier;
        }
        finally {
            this.rmClient = null;
        }
    }

    private void submitUnmanagedApp(ApplicationId appId) throws YarnException, IOException {
        SubmitApplicationRequest submitRequest = (SubmitApplicationRequest)this.recordFactory.newRecordInstance(SubmitApplicationRequest.class);
        ApplicationSubmissionContext context = (ApplicationSubmissionContext)this.recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
        context.setApplicationId(appId);
        context.setApplicationName("UnmanagedAM-" + this.appNameSuffix);
        if (StringUtils.isBlank((String)this.queueName)) {
            context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, "default"));
        } else {
            context.setQueue(this.queueName);
        }
        ContainerLaunchContext amContainer = (ContainerLaunchContext)this.recordFactory.newRecordInstance(ContainerLaunchContext.class);
        Resource resource = BuilderUtils.newResource(1024L, 1);
        context.setResource(resource);
        context.setAMContainerSpec(amContainer);
        submitRequest.setApplicationSubmissionContext(context);
        context.setUnmanagedAM(true);
        LOG.info("Submitting unmanaged application {}", (Object)appId);
        this.rmClient.submitApplication(submitRequest);
    }

    private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, Set<YarnApplicationState> appStates, YarnApplicationAttemptState attemptState) throws YarnException, IOException {
        long startTime = System.currentTimeMillis();
        ApplicationAttemptId appAttemptId = null;
        do {
            if (appAttemptId == null) {
                ApplicationReport report = this.getApplicationReport(appId);
                YarnApplicationState state = report.getYarnApplicationState();
                if (appStates.contains(state)) {
                    if (state != YarnApplicationState.ACCEPTED) {
                        throw new YarnRuntimeException("Received non-accepted application state: " + state + ". Application " + appId + " not the first attempt?");
                    }
                    appAttemptId = this.getApplicationReport(appId).getCurrentApplicationAttemptId();
                } else {
                    LOG.info("Current application state of {} is {}, will retry later.", (Object)appId, (Object)state);
                }
            }
            if (appAttemptId != null) {
                GetApplicationAttemptReportRequest req = (GetApplicationAttemptReportRequest)this.recordFactory.newRecordInstance(GetApplicationAttemptReportRequest.class);
                req.setApplicationAttemptId(appAttemptId);
                ApplicationAttemptReport attemptReport = this.rmClient.getApplicationAttemptReport(req).getApplicationAttemptReport();
                if (attemptState.equals((Object)attemptReport.getYarnApplicationAttemptState())) {
                    return attemptReport;
                }
                LOG.info("Current attempt state of " + appAttemptId + " is " + attemptReport.getYarnApplicationAttemptState() + ", waiting for current attempt to reach " + attemptState);
            }
            try {
                Thread.sleep(this.asyncApiPollIntervalMillis);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for current attempt of " + appId + " to reach " + attemptState);
            }
        } while (System.currentTimeMillis() - startTime <= 10000L);
        throw new RuntimeException("Timeout for waiting current attempt of " + appId + " to reach " + attemptState);
    }

    protected UnmanagedAMIdentifier getUAMIdentifier() throws IOException, YarnException {
        Token token = null;
        org.apache.hadoop.yarn.api.records.Token amrmToken = this.getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
        if (amrmToken != null) {
            token = ConverterUtils.convertFromYarn((org.apache.hadoop.yarn.api.records.Token)amrmToken, (Text)null);
        } else {
            LOG.warn("AMRMToken not found in the application report for application: {}", (Object)this.attemptId.getApplicationId());
        }
        return new UnmanagedAMIdentifier(this.attemptId, (Token<AMRMTokenIdentifier>)token);
    }

    private ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
        GetApplicationReportRequest request = (GetApplicationReportRequest)this.recordFactory.newRecordInstance(GetApplicationReportRequest.class);
        request.setApplicationId(appId);
        return this.rmClient.getApplicationReport(request).getApplicationReport();
    }

    @VisibleForTesting
    public int getRequestQueueSize() {
        return this.requestQueue.size();
    }

    protected class HeartBeatThreadUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        protected HeartBeatThreadUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOG.error("Heartbeat thread {} for application attempt {} crashed!", new Object[]{t.getName(), UnmanagedApplicationManager.this.attemptId, e});
        }
    }

    public class AMRequestHandlerThread
    extends Thread {
        private volatile boolean keepRunning;

        public AMRequestHandlerThread() {
            super("UnmanagedApplicationManager Heartbeat Handler Thread");
            this.keepRunning = true;
        }

        public void shutdown() {
            this.keepRunning = false;
            this.interrupt();
        }

        @Override
        public void run() {
            while (this.keepRunning) {
                try {
                    AsyncAllocateRequestInfo requestInfo = (AsyncAllocateRequestInfo)UnmanagedApplicationManager.this.requestQueue.take();
                    if (requestInfo == null) {
                        throw new YarnException("Null requestInfo taken from request queue");
                    }
                    if (!this.keepRunning) break;
                    AllocateRequest request = requestInfo.getRequest();
                    if (request == null) {
                        throw new YarnException("Null allocateRequest from requestInfo");
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + (request.getAskList() == null ? " empty" : Integer.valueOf(request.getAskList().size())));
                    }
                    request.setResponseId(UnmanagedApplicationManager.this.lastResponseId);
                    AllocateResponse response = AMRMClientUtils.allocateWithReRegister(request, UnmanagedApplicationManager.this.rmProxy, UnmanagedApplicationManager.this.registerRequest, UnmanagedApplicationManager.this.attemptId);
                    if (response == null) {
                        throw new YarnException("Null allocateResponse from allocate");
                    }
                    UnmanagedApplicationManager.this.lastResponseId = response.getResponseId();
                    if (response.getAMRMToken() != null) {
                        LOG.debug("Received new AMRMToken");
                        YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(), UnmanagedApplicationManager.this.userUgi, UnmanagedApplicationManager.this.conf);
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" + (response.getAllocatedContainers() == null ? " empty" : Integer.valueOf(response.getAllocatedContainers().size())));
                    }
                    if (requestInfo.getCallback() == null) {
                        throw new YarnException("Null callback from requestInfo");
                    }
                    requestInfo.getCallback().callback((Object)response);
                }
                catch (InterruptedException ex) {
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Interrupted while waiting for queue", (Throwable)ex);
                }
                catch (IOException ex) {
                    LOG.warn("IO Error occurred while processing heart beat for " + UnmanagedApplicationManager.this.attemptId, (Throwable)ex);
                }
                catch (Throwable ex) {
                    LOG.warn("Error occurred while processing heart beat for " + UnmanagedApplicationManager.this.attemptId, ex);
                }
            }
            LOG.info("UnmanagedApplicationManager has been stopped for {}. AMRequestHandlerThread thread is exiting", (Object)UnmanagedApplicationManager.this.attemptId);
        }
    }

    public static class AsyncAllocateRequestInfo {
        private AllocateRequest request;
        private AsyncCallback<AllocateResponse> callback;

        public AsyncAllocateRequestInfo(AllocateRequest request, AsyncCallback<AllocateResponse> callback) {
            Preconditions.checkArgument((request != null ? 1 : 0) != 0, (Object)"AllocateRequest cannot be null");
            Preconditions.checkArgument((callback != null ? 1 : 0) != 0, (Object)"Callback cannot be null");
            this.request = request;
            this.callback = callback;
        }

        public AsyncCallback<AllocateResponse> getCallback() {
            return this.callback;
        }

        public AllocateRequest getRequest() {
            return this.request;
        }
    }

    public static class UnmanagedAMIdentifier {
        private ApplicationAttemptId attemptId;
        private Token<AMRMTokenIdentifier> token;

        public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, Token<AMRMTokenIdentifier> token) {
            this.attemptId = attemptId;
            this.token = token;
        }

        public ApplicationAttemptId getAttemptId() {
            return this.attemptId;
        }

        public Token<AMRMTokenIdentifier> getToken() {
            return this.token;
        }
    }
}

