/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.util;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import java.net.InetAddress;
import java.util.UUID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class LeaderRetrievalUtils {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderRetrievalUtils.class);

    public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration) throws Exception {
        RecoveryMode recoveryMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
        switch (recoveryMode) {
            case STANDALONE: {
                return StandaloneUtils.createLeaderRetrievalService(configuration);
            }
            case ZOOKEEPER: {
                return ZooKeeperUtils.createLeaderRetrievalService(configuration);
            }
        }
        throw new Exception("Recovery mode " + (Object)((Object)recoveryMode) + " is not supported.");
    }

    public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration, ActorRef standaloneRef) throws Exception {
        RecoveryMode recoveryMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
        switch (recoveryMode) {
            case STANDALONE: {
                String akkaUrl = standaloneRef.path().toSerializationFormat();
                return new StandaloneLeaderRetrievalService(akkaUrl);
            }
            case ZOOKEEPER: {
                return ZooKeeperUtils.createLeaderRetrievalService(configuration);
            }
        }
        throw new Exception("Recovery mode " + (Object)((Object)recoveryMode) + " is not supported.");
    }

    public static ActorGateway retrieveLeaderGateway(LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem, FiniteDuration timeout) throws LeaderRetrievalException {
        LeaderGatewayListener listener = new LeaderGatewayListener(actorSystem, timeout);
        try {
            leaderRetrievalService.start(listener);
            Future<ActorGateway> actorGatewayFuture = listener.getActorGatewayFuture();
            ActorGateway actorGateway = (ActorGateway)Await.result(actorGatewayFuture, (Duration)timeout);
            return actorGateway;
        }
        catch (Exception e) {
            throw new LeaderRetrievalException("Could not retrieve the leader gateway", e);
        }
        finally {
            try {
                leaderRetrievalService.stop();
            }
            catch (Exception fe) {
                LOG.warn("Could not stop the leader retrieval service.", (Throwable)fe);
            }
        }
    }

    public static LeaderConnectionInfo retrieveLeaderConnectionInfo(LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout) throws LeaderRetrievalException {
        LeaderConnectionInfoListener listener = new LeaderConnectionInfoListener();
        try {
            leaderRetrievalService.start(listener);
            Future<LeaderConnectionInfo> connectionInfoFuture = listener.getLeaderConnectionInfoFuture();
            LeaderConnectionInfo leaderConnectionInfo = (LeaderConnectionInfo)Await.result(connectionInfoFuture, (Duration)timeout);
            return leaderConnectionInfo;
        }
        catch (Exception e) {
            throw new LeaderRetrievalException("Could not retrieve the leader address and leader session ID.", e);
        }
        finally {
            try {
                leaderRetrievalService.stop();
            }
            catch (Exception fe) {
                LOG.warn("Could not stop the leader retrieval service.", (Throwable)fe);
            }
        }
    }

    public static InetAddress findConnectingAddress(LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout) throws LeaderRetrievalException {
        ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();
        try {
            leaderRetrievalService.start(listener);
            LOG.info("Trying to select the network interface and address to use by connecting to the leading JobManager.");
            LOG.info("TaskManager will try to connect for " + timeout + " before falling back to heuristics");
            InetAddress inetAddress = listener.findConnectingAddress(timeout);
            return inetAddress;
        }
        catch (Exception e) {
            throw new LeaderRetrievalException("Could not find the connecting address by connecting to the current leader.", e);
        }
        finally {
            try {
                leaderRetrievalService.stop();
            }
            catch (Exception fe) {
                LOG.warn("Could not stop the leader retrieval service.", (Throwable)fe);
            }
        }
    }

    public static RecoveryMode getRecoveryMode(Configuration config) {
        String mode;
        switch (mode = config.getString("recovery.mode", ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase()) {
            case "STANDALONE": {
                return RecoveryMode.STANDALONE;
            }
            case "ZOOKEEPER": {
                return RecoveryMode.ZOOKEEPER;
            }
        }
        throw new IllegalConfigurationException("The value for 'recovery.mode' is unknown: " + mode);
    }

    private LeaderRetrievalUtils() {
        throw new RuntimeException();
    }

    public static class LeaderConnectionInfoListener
    implements LeaderRetrievalListener {
        private final Promise<LeaderConnectionInfo> connectionInfo = new Promise.DefaultPromise();

        public Future<LeaderConnectionInfo> getLeaderConnectionInfoFuture() {
            return this.connectionInfo.future();
        }

        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            if (leaderAddress != null && !leaderAddress.equals("") && !this.connectionInfo.isCompleted()) {
                this.connectionInfo.success((Object)new LeaderConnectionInfo(leaderAddress, leaderSessionID));
            }
        }

        @Override
        public void handleError(Exception exception) {
            if (!this.connectionInfo.isCompleted()) {
                this.connectionInfo.failure((Throwable)exception);
            }
        }
    }

    public static class LeaderGatewayListener
    implements LeaderRetrievalListener {
        private final ActorSystem actorSystem;
        private final FiniteDuration timeout;
        private final Object lock = new Object();
        private final Promise<ActorGateway> futureActorGateway = new Promise.DefaultPromise();

        public LeaderGatewayListener(ActorSystem actorSystem, FiniteDuration timeout) {
            this.actorSystem = actorSystem;
            this.timeout = timeout;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void completePromise(ActorGateway gateway) {
            Object object = this.lock;
            synchronized (object) {
                if (!this.futureActorGateway.isCompleted()) {
                    this.futureActorGateway.success((Object)gateway);
                }
            }
        }

        @Override
        public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
            if (leaderAddress != null && !leaderAddress.equals("") && !this.futureActorGateway.isCompleted()) {
                AkkaUtils.getActorRefFuture(leaderAddress, this.actorSystem, this.timeout).map((Function1)new Mapper<ActorRef, ActorGateway>(){

                    public ActorGateway apply(ActorRef ref) {
                        return new AkkaActorGateway(ref, leaderSessionID);
                    }
                }, (ExecutionContext)this.actorSystem.dispatcher()).onComplete((Function1)new OnComplete<ActorGateway>(){

                    public void onComplete(Throwable failure, ActorGateway success) throws Throwable {
                        if (failure == null) {
                            LeaderGatewayListener.this.completePromise(success);
                        } else {
                            LOG.debug("Could not retrieve the leader for address " + leaderAddress + ".", failure);
                        }
                    }
                }, (ExecutionContext)this.actorSystem.dispatcher());
            }
        }

        @Override
        public void handleError(Exception exception) {
            if (!this.futureActorGateway.isCompleted()) {
                this.futureActorGateway.failure((Throwable)exception);
            }
        }

        public Future<ActorGateway> getActorGatewayFuture() {
            return this.futureActorGateway.future();
        }
    }
}

