package com.datatorrent.stram;

import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.util.SecureExecutor;
import com.datatorrent.stram.webapp.OperatorInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StreamingContainerParent.class */
public class StreamingContainerParent extends CompositeService implements StreamingContainerUmbilicalProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerParent.class);
    private Server server;
    private SecretManager<? extends TokenIdentifier> tokenSecretManager;
    private InetSocketAddress address;
    private final StreamingContainerManager dagManager;
    private final int listenerThreadCount;

    public StreamingContainerParent(String str, StreamingContainerManager streamingContainerManager, SecretManager<? extends TokenIdentifier> secretManager, int i) {
        super(str);
        this.tokenSecretManager = null;
        this.dagManager = streamingContainerManager;
        this.tokenSecretManager = secretManager;
        this.listenerThreadCount = i;
    }

    public void init(Configuration configuration) {
        super.init(configuration);
    }

    public void start() {
        startRpcServer();
        super.start();
    }

    public void stop() {
        stopRpcServer();
        super.stop();
    }

    protected void startRpcServer() {
        Configuration config = getConfig();
        LOG.info("Config: " + config);
        LOG.info("Listener thread count " + this.listenerThreadCount);
        try {
            this.server = new RPC.Builder(config).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(this).setBindAddress("0.0.0.0").setPort(0).setNumHandlers(this.listenerThreadCount).setSecretManager(this.tokenSecretManager).setVerbose(false).build();
            if (config.getBoolean("hadoop.security.authorization", false)) {
                this.server.refreshServiceAcl(config, new PolicyProvider() { // from class: com.datatorrent.stram.StreamingContainerParent.1
                    public Service[] getServices() {
                        return new Service[]{new Service(StreamingContainerUmbilicalProtocol.class.getName(), StreamingContainerUmbilicalProtocol.class)};
                    }
                });
            }
            this.server.start();
            this.address = NetUtils.getConnectAddress(this.server);
            LOG.info("Container callback server listening at " + this.address);
        } catch (IOException e) {
            throw new YarnRuntimeException(e);
        }
    }

    protected void stopRpcServer() {
        this.server.stop();
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    public int getListenerThreadCount() {
        return this.listenerThreadCount;
    }

    void refreshServiceAcls(Configuration configuration, PolicyProvider policyProvider) {
        this.server.refreshServiceAcl(configuration, policyProvider);
    }

    public ProtocolSignature getProtocolSignature(String str, long j, int i) throws IOException {
        return ProtocolSignature.getProtocolSignature(this, str, j, i);
    }

    public long getProtocolVersion(String str, long j) throws IOException {
        return StreamingContainerUmbilicalProtocol.versionID;
    }

    @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
    public void log(String str, String str2) throws IOException {
        LOG.info("child msg: {} context: {}", str2, this.dagManager.getContainerAgent(str).container);
    }

    @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
    public void reportError(String str, int[] iArr, String str2) {
        if (iArr == null || iArr.length == 0) {
            this.dagManager.recordEventAsync(new StramEvent.ContainerErrorEvent(str, str2));
        } else {
            for (int i : iArr) {
                OperatorInfo operatorInfo = this.dagManager.getOperatorInfo(i);
                if (operatorInfo != null) {
                    this.dagManager.recordEventAsync(new StramEvent.OperatorErrorEvent(operatorInfo.name, i, str, str2));
                }
            }
        }
        try {
            log(str, str2);
        } catch (IOException e) {
        }
    }

    @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
    public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext(String str) throws IOException {
        return this.dagManager.getContainerAgent(str).getInitContext();
    }

    @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
    public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(final StreamingContainerUmbilicalProtocol.ContainerHeartbeat containerHeartbeat) {
        long currentTimeMillis = System.currentTimeMillis();
        if (containerHeartbeat.sentTms - currentTimeMillis > 50) {
            LOG.warn("Child container heartbeat sent time for {} ({}) is greater than the receive timestamp in AM ({}). Please make sure the clocks are in sync", new Object[]{containerHeartbeat.getContainerId(), Long.valueOf(containerHeartbeat.sentTms), Long.valueOf(currentTimeMillis)});
        }
        this.dagManager.updateRPCLatency(containerHeartbeat.getContainerId(), currentTimeMillis - containerHeartbeat.sentTms);
        try {
            return (StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse) SecureExecutor.execute(new SecureExecutor.WorkLoad<StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse>() { // from class: com.datatorrent.stram.StreamingContainerParent.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.datatorrent.stram.util.SecureExecutor.WorkLoad
                public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse run() {
                    return StreamingContainerParent.this.dagManager.processHeartbeat(containerHeartbeat);
                }
            });
        } catch (IOException e) {
            LOG.error("Error processing heartbeat", e);
            return null;
        }
    }
}
