package com.datatorrent.stram;

import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/RecoverableRpcProxy.class */
public class RecoverableRpcProxy implements InvocationHandler, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class);
    public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout";
    public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout";
    public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout";
    public static final String QP_retryTimeoutMillis = "retryTimeoutMillis";
    public static final String QP_retryDelayMillis = "retryDelayMillis";
    public static final String QP_rpcTimeout = "rpcTimeout";
    private static final int RETRY_TIMEOUT_DEFAULT = 30000;
    private static final int RETRY_DELAY_DEFAULT = 10000;
    private static final int RPC_TIMEOUT_DEFAULT = 5000;
    private final Configuration conf;
    private final String appPath;
    private StreamingContainerUmbilicalProtocol umbilical;
    private String lastConnectURI;
    private long lastCompletedCallTms;
    private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, 30000).longValue();
    private long retryDelayMillis = Long.getLong(RETRY_DELAY, 10000).longValue();
    private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT).intValue();

    public RecoverableRpcProxy(String str, Configuration configuration) throws IOException {
        this.conf = configuration;
        this.appPath = str;
        connect();
    }

    private void connect() throws IOException {
        String readConnectUri = new FSRecoveryHandler(this.appPath, this.conf).readConnectUri();
        if (!readConnectUri.equals(this.lastConnectURI)) {
            LOG.debug("Got new RPC connect address {}", readConnectUri);
            this.lastCompletedCallTms = System.currentTimeMillis();
            this.lastConnectURI = readConnectUri;
        }
        URI create = URI.create(readConnectUri);
        String query = create.getQuery();
        List<NameValuePair> parse = query != null ? URLEncodedUtils.parse(query, Charset.defaultCharset()) : null;
        if (parse != null) {
            for (NameValuePair nameValuePair : parse) {
                String value = nameValuePair.getValue();
                String name = nameValuePair.getName();
                if (QP_rpcTimeout.equals(name)) {
                    this.rpcTimeout = Integer.parseInt(value);
                } else if (QP_retryTimeoutMillis.equals(name)) {
                    this.retryTimeoutMillis = Long.parseLong(value);
                } else if (QP_retryDelayMillis.equals(name)) {
                    this.retryDelayMillis = Long.parseLong(value);
                }
            }
        }
        this.umbilical = (StreamingContainerUmbilicalProtocol) RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, NetUtils.createSocketAddrForHost(create.getHost(), create.getPort()), UserGroupInformation.getCurrentUser(), this.conf, NetUtils.getDefaultSocketFactory(this.conf), this.rpcTimeout);
    }

    public StreamingContainerUmbilicalProtocol getProxy() {
        return (StreamingContainerUmbilicalProtocol) Proxy.newProxyInstance(this.umbilical.getClass().getClassLoader(), this.umbilical.getClass().getInterfaces(), this);
    }

    @Override // java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws ConnectException, SocketTimeoutException, InterruptedException, IllegalAccessException {
        while (true) {
            try {
                if (this.umbilical == null) {
                    connect();
                }
                Object invoke = method.invoke(this.umbilical, objArr);
                this.lastCompletedCallTms = System.currentTimeMillis();
                return invoke;
            } catch (IOException e) {
                close();
                throw new RuntimeException(e);
            } catch (InvocationTargetException e2) {
                Throwable targetException = e2.getTargetException();
                long currentTimeMillis = System.currentTimeMillis() - this.lastCompletedCallTms;
                if (currentTimeMillis >= this.retryTimeoutMillis) {
                    LOG.error("Giving up RPC connection recovery after {} ms", Long.valueOf(currentTimeMillis), targetException);
                    if (targetException instanceof ConnectException) {
                        throw ((ConnectException) targetException);
                    }
                    if (targetException instanceof SocketTimeoutException) {
                        throw ((SocketTimeoutException) targetException);
                    }
                    throw Throwables.propagate(targetException);
                }
                LOG.warn("RPC failure, attempting reconnect after {} ms (remaining {} ms)", new Object[]{Long.valueOf(this.retryDelayMillis), Long.valueOf(this.retryTimeoutMillis - currentTimeMillis), targetException});
                close();
                Thread.sleep(this.retryDelayMillis);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.debug("Closing RPC connection {}", this.lastConnectURI);
        if (this.umbilical != null) {
            RPC.stopProxy(this.umbilical);
            this.umbilical = null;
        }
    }

    public static URI toConnectURI(InetSocketAddress inetSocketAddress) throws Exception {
        return toConnectURI(inetSocketAddress, Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT).intValue(), Long.getLong(RETRY_DELAY, 10000L).longValue(), Long.getLong(RETRY_TIMEOUT, 30000L).longValue());
    }

    public static URI toConnectURI(InetSocketAddress inetSocketAddress, int i, long j, long j2) throws Exception {
        return new URIBuilder().setScheme("stram").setHost(inetSocketAddress.getHostName()).setPort(inetSocketAddress.getPort()).setParameter(QP_rpcTimeout, Integer.toString(i)).setParameter(QP_retryDelayMillis, Long.toString(j)).setParameter(QP_retryTimeoutMillis, Long.toString(j2)).build();
    }
}
