/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ha;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.fs.s3hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.commons.logging.Log;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.commons.logging.LogFactory;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.classification.InterfaceAudience;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ha.HAServiceStatus;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ha.HAServiceTarget;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ipc.RPC;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.ipc.RemoteException;
import org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.util.Daemon;

@InterfaceAudience.Private
public class HealthMonitor {
    private static final Log LOG = LogFactory.getLog(HealthMonitor.class);
    private Daemon daemon;
    private long connectRetryInterval;
    private long checkIntervalMillis;
    private long sleepAfterDisconnectMillis;
    private int rpcTimeout;
    private volatile boolean shouldRun = true;
    private HAServiceProtocol proxy;
    private final HAServiceTarget targetToMonitor;
    private final Configuration conf;
    private State state = State.INITIALIZING;
    private List<Callback> callbacks = Collections.synchronizedList(new LinkedList());
    private List<ServiceStateCallback> serviceStateCallbacks = Collections.synchronizedList(new LinkedList());
    private HAServiceStatus lastServiceState = new HAServiceStatus(HAServiceProtocol.HAServiceState.INITIALIZING);

    HealthMonitor(Configuration conf, HAServiceTarget target) {
        this.targetToMonitor = target;
        this.conf = conf;
        this.sleepAfterDisconnectMillis = conf.getLong("ha.health-monitor.sleep-after-disconnect.ms", 1000L);
        this.checkIntervalMillis = conf.getLong("ha.health-monitor.check-interval.ms", 1000L);
        this.connectRetryInterval = conf.getLong("ha.health-monitor.connect-retry-interval.ms", 1000L);
        this.rpcTimeout = conf.getInt("ha.health-monitor.rpc-timeout.ms", 45000);
        this.daemon = new MonitorDaemon();
    }

    public void addCallback(Callback cb) {
        this.callbacks.add(cb);
    }

    public void removeCallback(Callback cb) {
        this.callbacks.remove(cb);
    }

    public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
        this.serviceStateCallbacks.add(cb);
    }

    public synchronized void removeServiceStateCallback(ServiceStateCallback cb) {
        this.serviceStateCallbacks.remove(cb);
    }

    public void shutdown() {
        LOG.info("Stopping HealthMonitor thread");
        this.shouldRun = false;
        this.daemon.interrupt();
    }

    public synchronized HAServiceProtocol getProxy() {
        return this.proxy;
    }

    private void loopUntilConnected() throws InterruptedException {
        this.tryConnect();
        while (this.proxy == null) {
            Thread.sleep(this.connectRetryInterval);
            this.tryConnect();
        }
        assert (this.proxy != null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void tryConnect() {
        Preconditions.checkState(this.proxy == null);
        try {
            HealthMonitor healthMonitor = this;
            synchronized (healthMonitor) {
                this.proxy = this.createProxy();
            }
        }
        catch (IOException e) {
            LOG.warn("Could not connect to local service at " + this.targetToMonitor + ": " + e.getMessage());
            this.proxy = null;
            this.enterState(State.SERVICE_NOT_RESPONDING);
        }
    }

    protected HAServiceProtocol createProxy() throws IOException {
        return this.targetToMonitor.getHealthMonitorProxy(this.conf, this.rpcTimeout);
    }

    private void doHealthChecks() throws InterruptedException {
        while (this.shouldRun) {
            HAServiceStatus status = null;
            boolean healthy = false;
            try {
                status = this.proxy.getServiceStatus();
                this.proxy.monitorHealth();
                healthy = true;
            }
            catch (Throwable t) {
                if (this.isHealthCheckFailedException(t)) {
                    LOG.warn("Service health check failed for " + this.targetToMonitor + ": " + t.getMessage());
                    this.enterState(State.SERVICE_UNHEALTHY);
                }
                LOG.warn("Transport-level exception trying to monitor health of " + this.targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
                RPC.stopProxy(this.proxy);
                this.proxy = null;
                this.enterState(State.SERVICE_NOT_RESPONDING);
                Thread.sleep(this.sleepAfterDisconnectMillis);
                return;
            }
            if (status != null) {
                this.setLastServiceStatus(status);
            }
            if (healthy) {
                this.enterState(State.SERVICE_HEALTHY);
            }
            Thread.sleep(this.checkIntervalMillis);
        }
    }

    private boolean isHealthCheckFailedException(Throwable t) {
        return t instanceof HealthCheckFailedException || t instanceof RemoteException && ((RemoteException)t).unwrapRemoteException(HealthCheckFailedException.class) instanceof HealthCheckFailedException;
    }

    private synchronized void setLastServiceStatus(HAServiceStatus status) {
        this.lastServiceState = status;
        for (ServiceStateCallback cb : this.serviceStateCallbacks) {
            cb.reportServiceStatus(this.lastServiceState);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void enterState(State newState) {
        if (newState != this.state) {
            LOG.info("Entering state " + (Object)((Object)newState));
            this.state = newState;
            List<Callback> list = this.callbacks;
            synchronized (list) {
                for (Callback cb : this.callbacks) {
                    cb.enteredState(newState);
                }
            }
        }
    }

    synchronized State getHealthState() {
        return this.state;
    }

    synchronized HAServiceStatus getLastServiceStatus() {
        return this.lastServiceState;
    }

    boolean isAlive() {
        return this.daemon.isAlive();
    }

    void join() throws InterruptedException {
        this.daemon.join();
    }

    void start() {
        this.daemon.start();
    }

    static interface ServiceStateCallback {
        public void reportServiceStatus(HAServiceStatus var1);
    }

    static interface Callback {
        public void enteredState(State var1);
    }

    private class MonitorDaemon
    extends Daemon {
        private MonitorDaemon() {
            this.setName("Health Monitor for " + HealthMonitor.this.targetToMonitor);
            this.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    LOG.fatal("Health monitor failed", e);
                    HealthMonitor.this.enterState(State.HEALTH_MONITOR_FAILED);
                }
            });
        }

        @Override
        public void run() {
            while (HealthMonitor.this.shouldRun) {
                try {
                    HealthMonitor.this.loopUntilConnected();
                    HealthMonitor.this.doHealthChecks();
                }
                catch (InterruptedException ie) {
                    Preconditions.checkState(!HealthMonitor.this.shouldRun, "Interrupted but still supposed to run");
                }
            }
        }
    }

    @InterfaceAudience.Private
    public static enum State {
        INITIALIZING,
        SERVICE_NOT_RESPONDING,
        SERVICE_HEALTHY,
        SERVICE_UNHEALTHY,
        HEALTH_MONITOR_FAILED;

    }
}

