/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerProcessContext;
import org.apache.flink.runtime.resourcemanager.ResourceManagerService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResourceManagerServiceImpl
implements ResourceManagerService,
LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
    private final ResourceManagerFactory<?> resourceManagerFactory;
    private final ResourceManagerProcessContext rmProcessContext;
    private final LeaderElectionService leaderElectionService;
    private final FatalErrorHandler fatalErrorHandler;
    private final Executor ioExecutor;
    private final ExecutorService handleLeaderEventExecutor;
    private final CompletableFuture<Void> serviceTerminationFuture;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private boolean running;
    @Nullable
    @GuardedBy(value="lock")
    private ResourceManager<?> leaderResourceManager;
    @Nullable
    @GuardedBy(value="lock")
    private UUID leaderSessionID;
    @GuardedBy(value="lock")
    private CompletableFuture<Void> previousResourceManagerTerminationFuture;

    private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory, ResourceManagerProcessContext rmProcessContext) {
        this.resourceManagerFactory = (ResourceManagerFactory)Preconditions.checkNotNull(resourceManagerFactory);
        this.rmProcessContext = (ResourceManagerProcessContext)Preconditions.checkNotNull((Object)rmProcessContext);
        this.leaderElectionService = rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElectionService();
        this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
        this.ioExecutor = rmProcessContext.getIoExecutor();
        this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
        this.serviceTerminationFuture = new CompletableFuture();
        this.running = false;
        this.leaderResourceManager = null;
        this.leaderSessionID = null;
        this.previousResourceManagerTerminationFuture = FutureUtils.completedVoidFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.debug("Resource manager service has already started.");
                return;
            }
            this.running = true;
        }
        LOG.info("Starting resource manager service.");
        this.leaderElectionService.start(this);
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return this.serviceTerminationFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.running || this.leaderResourceManager == null) {
                return ResourceManagerServiceImpl.deregisterWithoutLeaderRm();
            }
            ResourceManager<?> currentLeaderRM = this.leaderResourceManager;
            return currentLeaderRM.getStartedFuture().thenCompose(ignore -> {
                Object object = this.lock;
                synchronized (object) {
                    if (this.isLeader(currentLeaderRM)) {
                        return ((ResourceManagerGateway)currentLeaderRM.getSelfGateway(ResourceManagerGateway.class)).deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
                    }
                    return ResourceManagerServiceImpl.deregisterWithoutLeaderRm();
                }
            });
        }
    }

    private static CompletableFuture<Void> deregisterWithoutLeaderRm() {
        LOG.warn("Cannot deregister application. Resource manager service is not available.");
        return FutureUtils.completedVoidFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.info("Stopping resource manager service.");
                this.running = false;
                this.stopLeaderElectionService();
                this.stopLeaderResourceManager();
            } else {
                LOG.debug("Resource manager service is not running.");
            }
            FutureUtils.forward(this.previousResourceManagerTerminationFuture, this.serviceTerminationFuture);
        }
        this.handleLeaderEventExecutor.shutdownNow();
        return this.serviceTerminationFuture;
    }

    @Override
    public void grantLeadership(UUID newLeaderSessionID) {
        this.handleLeaderEventExecutor.execute(() -> {
            Object object = this.lock;
            synchronized (object) {
                if (!this.running) {
                    LOG.info("Resource manager service is not running. Ignore granting leadership with session ID {}.", (Object)newLeaderSessionID);
                    return;
                }
                LOG.info("Resource manager service is granted leadership with session id {}.", (Object)newLeaderSessionID);
                try {
                    this.startNewLeaderResourceManager(newLeaderSessionID);
                }
                catch (Throwable t) {
                    this.fatalErrorHandler.onFatalError((Throwable)new FlinkException("Cannot start resource manager.", t));
                }
            }
        });
    }

    @Override
    public void revokeLeadership() {
        this.handleLeaderEventExecutor.execute(() -> {
            Object object = this.lock;
            synchronized (object) {
                if (!this.running) {
                    LOG.info("Resource manager service is not running. Ignore revoking leadership.");
                    return;
                }
                LOG.info("Resource manager service is revoked leadership with session id {}.", (Object)this.leaderSessionID);
                this.stopLeaderResourceManager();
                if (!this.resourceManagerFactory.supportMultiLeaderSession()) {
                    this.closeAsync();
                }
            }
        });
    }

    @Override
    public void handleError(Exception exception) {
        this.fatalErrorHandler.onFatalError((Throwable)new FlinkException("Exception during leader election of resource manager occurred.", (Throwable)exception));
    }

    @GuardedBy(value="lock")
    private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Exception {
        this.stopLeaderResourceManager();
        this.leaderSessionID = newLeaderSessionID;
        ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager = this.resourceManagerFactory.createResourceManager(this.rmProcessContext, newLeaderSessionID);
        ((CompletableFuture)this.previousResourceManagerTerminationFuture.thenComposeAsync(ignore -> {
            Object object = this.lock;
            synchronized (object) {
                return this.startResourceManagerIfIsLeader(newLeaderResourceManager);
            }
        }, (Executor)this.handleLeaderEventExecutor)).thenAcceptAsync(isStillLeader -> {
            if (isStillLeader.booleanValue()) {
                this.leaderElectionService.confirmLeadership(newLeaderSessionID, newLeaderResourceManager.getAddress());
            }
        }, this.ioExecutor);
    }

    @GuardedBy(value="lock")
    private CompletableFuture<Boolean> startResourceManagerIfIsLeader(ResourceManager<?> resourceManager) {
        if (this.isLeader(resourceManager)) {
            resourceManager.start();
            this.forwardTerminationFuture(resourceManager);
            return resourceManager.getStartedFuture().thenApply(ignore -> true);
        }
        return CompletableFuture.completedFuture(false);
    }

    private void forwardTerminationFuture(ResourceManager<?> resourceManager) {
        resourceManager.getTerminationFuture().whenComplete((ignore, throwable) -> {
            Object object = this.lock;
            synchronized (object) {
                if (this.isLeader(resourceManager)) {
                    if (throwable != null) {
                        this.serviceTerminationFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.serviceTerminationFuture.complete(null);
                    }
                }
            }
        });
    }

    @GuardedBy(value="lock")
    private boolean isLeader(ResourceManager<?> resourceManager) {
        return this.running && this.leaderResourceManager == resourceManager;
    }

    @GuardedBy(value="lock")
    private void stopLeaderResourceManager() {
        if (this.leaderResourceManager != null) {
            this.previousResourceManagerTerminationFuture = this.previousResourceManagerTerminationFuture.thenCombine((CompletionStage)this.leaderResourceManager.closeAsync(), (ignore1, ignore2) -> null);
            this.leaderResourceManager = null;
            this.leaderSessionID = null;
        }
    }

    private void stopLeaderElectionService() {
        try {
            this.leaderElectionService.stop();
        }
        catch (Exception e) {
            this.serviceTerminationFuture.completeExceptionally((Throwable)new FlinkException("Cannot stop leader election service.", (Throwable)e));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    @VisibleForTesting
    public ResourceManager<?> getLeaderResourceManager() {
        Object object = this.lock;
        synchronized (object) {
            return this.leaderResourceManager;
        }
    }

    public static ResourceManagerServiceImpl create(ResourceManagerFactory<?> resourceManagerFactory, Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, MetricRegistry metricRegistry, String hostname, Executor ioExecutor) throws ConfigurationException {
        return new ResourceManagerServiceImpl(resourceManagerFactory, resourceManagerFactory.createResourceManagerProcessContext(configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, clusterInformation, webInterfaceUrl, metricRegistry, hostname, ioExecutor));
    }
}

