package org.apache.flink.runtime.leaderelection;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.class */
public class DefaultLeaderElectionService extends AbstractLeaderElectionService implements LeaderElectionEventHandler, MultipleComponentLeaderElectionDriver.Listener, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
    private final Object lock;
    private final LeaderElectionDriverFactory leaderElectionDriverFactory;

    @GuardedBy("lock")
    private String contenderID;

    @GuardedBy("lock")
    private LeaderContender leaderContender;

    @GuardedBy("lock")
    @Nullable
    private UUID issuedLeaderSessionID;

    @GuardedBy("lock")
    private LeaderInformation confirmedLeaderInformation;

    @GuardedBy("lock")
    private boolean running;
    private LeaderElectionDriver leaderElectionDriver;
    private final ExecutorService leadershipOperationExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService$LeaderElectionFatalErrorHandler.class */
    public class LeaderElectionFatalErrorHandler implements FatalErrorHandler {
        private LeaderElectionFatalErrorHandler() {
        }

        public void onFatalError(Throwable th) {
            DefaultLeaderElectionService.this.forwardErrorToLeaderContender(th);
        }
    }

    public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) {
        this(leaderElectionDriverFactory, Executors.newSingleThreadExecutor(new ExecutorThreadFactory("DefaultLeaderElectionService-leadershipOperationExecutor")));
    }

    @VisibleForTesting
    DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory, ExecutorService executorService) {
        this.lock = new Object();
        this.leaderElectionDriverFactory = (LeaderElectionDriverFactory) Preconditions.checkNotNull(leaderElectionDriverFactory);
        this.leaderContender = null;
        this.issuedLeaderSessionID = null;
        this.leaderElectionDriver = null;
        this.confirmedLeaderInformation = LeaderInformation.empty();
        this.leadershipOperationExecutor = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.running = false;
    }

    public void startLeaderElectionBackend() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(this.leaderContender == null, "No LeaderContender should have been registered, yet.");
            Preconditions.checkState(this.leaderElectionDriver == null, "This DefaultLeaderElectionService cannot be reused. Calling startLeaderElectionBackend can only be called once to establish the connection to the HA backend.");
            this.running = true;
            this.leaderElectionDriver = this.leaderElectionDriverFactory.createLeaderElectionDriver(this, new LeaderElectionFatalErrorHandler());
            LOG.info("Instantiating DefaultLeaderElectionService with {}.", this.leaderElectionDriver);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService
    public void register(String str, LeaderContender leaderContender) throws Exception {
        Preconditions.checkNotNull(str, "ContenderID must not be null.");
        Preconditions.checkNotNull(leaderContender, "Contender must not be null.");
        synchronized (this.lock) {
            Preconditions.checkState(this.leaderContender == null, "Only one LeaderContender is allowed to be registered to this service.");
            Preconditions.checkState(this.contenderID == null, "The contenderID is only allowed to be set once.");
            Preconditions.checkState(this.running, "The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
            this.leaderContender = leaderContender;
            this.contenderID = str;
            LOG.info("LeaderContender {} has been registered for {}.", leaderContender.getDescription(), this.leaderElectionDriver);
            if (this.issuedLeaderSessionID != null) {
                runInLeaderEventThread(() -> {
                    notifyLeaderContenderOfLeadership(this.issuedLeaderSessionID);
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService
    public final void remove(String str) {
        synchronized (this.lock) {
            if (this.contenderID == null) {
                LOG.debug("The stop procedure was called on an already stopped DefaultLeaderElectionService instance. No action necessary.");
                return;
            }
            LOG.info("Stopping DefaultLeaderElectionService for {}.", this.contenderID);
            Preconditions.checkNotNull(this.leaderContender, "There should be a LeaderContender registered under the given contenderID '%s'.", new Object[]{this.contenderID});
            if (this.issuedLeaderSessionID != null) {
                notifyLeaderContenderOfLeadershipLoss();
                LOG.debug("DefaultLeaderElectionService is stopping while having the leadership acquired. The revoke event is forwarded to the LeaderContender.");
                if (this.leaderElectionDriver.hasLeadership()) {
                    this.leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());
                    LOG.debug("Leader information is cleaned up while stopping.");
                }
            } else {
                Preconditions.checkState(this.confirmedLeaderInformation.isEmpty(), "The confirmed leader information should have been cleared.");
                LOG.debug("DefaultLeaderElectionService is stopping while not having the leadership acquired. No cleanup necessary.");
            }
            this.contenderID = null;
            this.leaderContender = null;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(this.leaderElectionDriver != null, "The HA backend wasn't initialized.");
            Preconditions.checkState(this.leaderContender == null, "The DefaultLeaderElectionService should have been stopped before closing the instance.");
            this.issuedLeaderSessionID = null;
            if (this.running) {
                this.running = false;
            } else {
                LOG.debug("The HA backend connection isn't established. No actions taken.");
            }
        }
        this.leaderElectionDriver.close();
        List<Runnable> shutdownNow = ((ExecutorService) Preconditions.checkNotNull(this.leadershipOperationExecutor)).shutdownNow();
        if (shutdownNow.isEmpty()) {
            return;
        }
        LOG.debug("The DefaultLeaderElectionService was closed with {} event(s) still not being processed. No further action necessary.", Integer.valueOf(shutdownNow.size()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService
    public void confirmLeadership(String str, UUID uuid, String str2) {
        Preconditions.checkArgument(str.equals(this.contenderID));
        LOG.debug("The leader session of {} is confirmed with session ID {} for and address {}.", new Object[]{this.contenderID, uuid, str2});
        Preconditions.checkNotNull(uuid);
        synchronized (this.lock) {
            if (hasLeadership(str, uuid)) {
                Preconditions.checkState(this.confirmedLeaderInformation.isEmpty(), "No confirmation should have happened, yet.");
                this.confirmedLeaderInformation = LeaderInformation.known(uuid, str2);
                this.leaderElectionDriver.writeLeaderInformation(this.confirmedLeaderInformation);
            } else if (uuid.equals(this.issuedLeaderSessionID)) {
                LOG.warn("The leader session ID {} was confirmed even though the corresponding service was not elected as the leader or has been stopped already.", uuid);
            } else {
                LOG.debug("Receive an old confirmation call of leader session ID {}, current issued session ID is {}", uuid, this.issuedLeaderSessionID);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService
    public boolean hasLeadership(String str, UUID uuid) {
        synchronized (this.lock) {
            if (this.leaderElectionDriver == null) {
                LOG.debug("hasLeadership is called after the service is closed, returning false.");
                return false;
            }
            if (str.equals(this.contenderID)) {
                return this.leaderElectionDriver.hasLeadership() && uuid.equals(this.issuedLeaderSessionID);
            }
            LOG.debug("hasLeadership is called for contender ID '{}' while there is no contender registered under that ID in service, returning false.", str);
            return false;
        }
    }

    @VisibleForTesting
    @Nullable
    public UUID getLeaderSessionID(String str) {
        UUID leaderSessionID;
        synchronized (this.lock) {
            leaderSessionID = str.equals(this.contenderID) ? this.confirmedLeaderInformation.getLeaderSessionID() : null;
        }
        return leaderSessionID;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler
    public void onGrantLeadership(UUID uuid) {
        runInLeaderEventThread(() -> {
            onGrantLeadershipInternal(uuid);
        });
    }

    @GuardedBy("lock")
    private void onGrantLeadershipInternal(UUID uuid) {
        Preconditions.checkNotNull(uuid);
        Preconditions.checkState(this.issuedLeaderSessionID == null, "The leadership should have been granted while not having the leadership acquired.");
        this.issuedLeaderSessionID = uuid;
        notifyLeaderContenderOfLeadership(this.issuedLeaderSessionID);
    }

    @GuardedBy("lock")
    private void notifyLeaderContenderOfLeadership(UUID uuid) {
        if (this.leaderContender == null) {
            LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.", uuid, this.leaderElectionDriver);
        } else {
            if (!uuid.equals(this.issuedLeaderSessionID)) {
                LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.", uuid, this.issuedLeaderSessionID);
                return;
            }
            Preconditions.checkState(this.confirmedLeaderInformation.isEmpty(), "The leadership should have been granted while not having the leadership acquired.");
            LOG.debug("Granting leadership to contender {} with session ID {}.", this.leaderContender.getDescription(), this.issuedLeaderSessionID);
            this.leaderContender.grantLeadership(this.issuedLeaderSessionID);
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler
    public void onRevokeLeadership() {
        runInLeaderEventThread(this::onRevokeLeadershipInternal);
    }

    @GuardedBy("lock")
    private void onRevokeLeadershipInternal() {
        if (this.leaderContender != null) {
            notifyLeaderContenderOfLeadershipLoss();
        } else {
            LOG.debug("The revoke leadership for session {} notification is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.", this.issuedLeaderSessionID, this.leaderElectionDriver);
        }
        this.issuedLeaderSessionID = null;
    }

    @GuardedBy("lock")
    private void notifyLeaderContenderOfLeadershipLoss() {
        Preconditions.checkState(this.leaderContender != null, "The LeaderContender should be always set when calling this method.");
        if (this.confirmedLeaderInformation.isEmpty()) {
            LOG.debug("Revoking leadership to contender {} while a previous leadership grant wasn't confirmed, yet.", this.leaderContender.getDescription());
        } else {
            LOG.debug("Revoking leadership to contender {} for {}.", this.leaderContender.getDescription(), LeaderElectionUtils.convertToString(this.confirmedLeaderInformation));
        }
        this.confirmedLeaderInformation = LeaderInformation.empty();
        this.leaderContender.revokeLeadership();
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler
    public void onLeaderInformationChange(LeaderInformation leaderInformation) {
        runInLeaderEventThread(() -> {
            onLeaderInformationChangeInternal(leaderInformation);
        });
    }

    @GuardedBy("lock")
    private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) {
        if (this.leaderContender == null) {
            LOG.debug("Ignoring change notification since the {} has already been stopped.", this.leaderElectionDriver);
            return;
        }
        LOG.trace("Leader node changed while {} is the leader with {}. New leader information {}.", new Object[]{this.leaderContender.getDescription(), LeaderElectionUtils.convertToString(this.confirmedLeaderInformation), LeaderElectionUtils.convertToString(leaderInformation)});
        if (this.confirmedLeaderInformation.isEmpty()) {
            return;
        }
        LeaderInformation leaderInformation2 = this.confirmedLeaderInformation;
        if (leaderInformation.isEmpty()) {
            LOG.debug("Writing leader information by {} since the external storage is empty.", this.leaderContender.getDescription());
            this.leaderElectionDriver.writeLeaderInformation(leaderInformation2);
        } else {
            if (leaderInformation.equals(leaderInformation2)) {
                return;
            }
            LOG.debug("Correcting leader information by {}.", this.leaderContender.getDescription());
            this.leaderElectionDriver.writeLeaderInformation(leaderInformation2);
        }
    }

    private void runInLeaderEventThread(Runnable runnable) {
        synchronized (this.lock) {
            if (this.running) {
                FutureUtils.handleUncaughtException(CompletableFuture.runAsync(() -> {
                    synchronized (this.lock) {
                        if (this.running) {
                            runnable.run();
                        }
                    }
                }, this.leadershipOperationExecutor), (thread, th) -> {
                    forwardErrorToLeaderContender(th);
                });
            } else {
                LOG.debug("Leader event handling was triggered after the DefaultLeaderElectionService is closed. The event will be ignored.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void forwardErrorToLeaderContender(Throwable th) {
        synchronized (this.lock) {
            if (this.leaderContender == null) {
                LOG.debug("Ignoring error notification since there's no contender registered.");
                return;
            }
            if (th instanceof LeaderElectionException) {
                this.leaderContender.handleError((LeaderElectionException) th);
            } else {
                this.leaderContender.handleError(new LeaderElectionException(th));
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver.Listener
    public void isLeader() {
        onGrantLeadership(UUID.randomUUID());
    }

    @Override // org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver.Listener
    public void notLeader() {
        onRevokeLeadership();
    }

    @Override // org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver.Listener
    public void notifyLeaderInformationChange(String str, LeaderInformation leaderInformation) {
        if (str.equals(this.contenderID)) {
            onLeaderInformationChange(leaderInformation);
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver.Listener
    public void notifyAllKnownLeaderInformation(LeaderInformationRegister leaderInformationRegister) {
        leaderInformationRegister.forContenderID(this.contenderID).ifPresent(this::onLeaderInformationChange);
    }
}
