/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElector;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ActiveStandbyElectorBasedElectorService
extends AbstractService
implements EmbeddedElector,
ActiveStandbyElector.ActiveStandbyElectorCallback {
    private static final Logger LOG = LoggerFactory.getLogger((String)ActiveStandbyElectorBasedElectorService.class.getName());
    private static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
    private ResourceManager rm;
    private byte[] localActiveNodeInfo;
    private ActiveStandbyElector elector;
    private long zkSessionTimeout;
    private Timer zkDisconnectTimer;
    @VisibleForTesting
    final Object zkDisconnectLock = new Object();

    ActiveStandbyElectorBasedElectorService(ResourceManager rm) {
        super(ActiveStandbyElectorBasedElectorService.class.getName());
        this.rm = rm;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        String zkQuorum = (conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)).get("yarn.resourcemanager.zk-address");
        if (zkQuorum == null) {
            throw new YarnRuntimeException("Embedded automatic failover is enabled, but yarn.resourcemanager.zk-address is not set");
        }
        String rmId = HAUtil.getRMHAId((Configuration)conf);
        String clusterId = YarnConfiguration.getClusterId((Configuration)conf);
        this.localActiveNodeInfo = ActiveStandbyElectorBasedElectorService.createActiveNodeInfo(clusterId, rmId);
        String zkBasePath = conf.get("yarn.resourcemanager.ha.automatic-failover.zk-base-path", "/yarn-leader-election");
        String electionZNode = zkBasePath + "/" + clusterId;
        this.zkSessionTimeout = conf.getLong("yarn.resourcemanager.zk-timeout-ms", 10000L);
        List zkAcls = ZKCuratorManager.getZKAcls((Configuration)conf);
        List zkAuths = ZKCuratorManager.getZKAuths((Configuration)conf);
        int maxRetryNum = conf.getInt("yarn.resourcemanager.ha.failover-controller.active-standby-elector.zk.retries", conf.getInt("ha.failover-controller.active-standby-elector.zk.op.retries", 3));
        this.elector = new ActiveStandbyElector(zkQuorum, (int)this.zkSessionTimeout, electionZNode, zkAcls, zkAuths, (ActiveStandbyElector.ActiveStandbyElectorCallback)this, maxRetryNum, false);
        this.elector.ensureParentZNode();
        if (!this.isParentZnodeSafe(clusterId)) {
            this.notifyFatalError(String.format("invalid data in znode, %s, which may require the state store to be reformatted", electionZNode));
        }
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        this.elector.joinElection(this.localActiveNodeInfo);
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        if (this.elector != null) {
            this.elector.quitElection(false);
            this.elector.terminateConnection();
        }
        super.serviceStop();
    }

    public void becomeActive() throws ServiceFailedException {
        this.cancelDisconnectTimer();
        try {
            this.rm.getRMContext().getRMAdminService().transitionToActive(req);
        }
        catch (Exception e) {
            throw new ServiceFailedException("RM could not transition to Active", (Throwable)e);
        }
    }

    public void becomeStandby() {
        this.cancelDisconnectTimer();
        try {
            this.rm.getRMContext().getRMAdminService().transitionToStandby(req);
        }
        catch (Exception e) {
            LOG.error("RM could not transition to Standby", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cancelDisconnectTimer() {
        Object object = this.zkDisconnectLock;
        synchronized (object) {
            if (this.zkDisconnectTimer != null) {
                this.zkDisconnectTimer.cancel();
                this.zkDisconnectTimer = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enterNeutralMode() {
        LOG.warn("Lost contact with Zookeeper. Transitioning to standby in " + this.zkSessionTimeout + " ms if connection is not reestablished.");
        Object object = this.zkDisconnectLock;
        synchronized (object) {
            if (this.zkDisconnectTimer == null) {
                this.zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
                this.zkDisconnectTimer.schedule(new TimerTask(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        Object object = ActiveStandbyElectorBasedElectorService.this.zkDisconnectLock;
                        synchronized (object) {
                            if (ActiveStandbyElectorBasedElectorService.this.zkDisconnectTimer != null) {
                                ActiveStandbyElectorBasedElectorService.this.becomeStandby();
                            }
                        }
                    }
                }, this.zkSessionTimeout);
            }
        }
    }

    public void notifyFatalError(String errorMessage) {
        this.rm.getRMContext().getDispatcher().getEventHandler().handle((Event)new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
    }

    public void fenceOldActive(byte[] oldActiveData) {
        LOG.debug("Request to fence old active being ignored, as embedded leader election doesn't support fencing");
    }

    private static byte[] createActiveNodeInfo(String clusterId, String rmId) throws IOException {
        return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto.newBuilder().setClusterId(clusterId).setRmId(rmId).build().toByteArray();
    }

    private boolean isParentZnodeSafe(String clusterId) throws InterruptedException, IOException, KeeperException {
        YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
        byte[] data;
        try {
            data = this.elector.getActiveData();
        }
        catch (ActiveStandbyElector.ActiveNotFoundException e) {
            return true;
        }
        try {
            proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto.parseFrom((byte[])data);
        }
        catch (InvalidProtocolBufferException e) {
            LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString((byte[])data));
            return false;
        }
        if (!proto.getClusterId().equals(clusterId)) {
            LOG.error("Mismatched cluster! The other RM seems to be from a different cluster. Current cluster = " + clusterId + "Other RM's cluster = " + proto.getClusterId());
            return false;
        }
        return true;
    }

    @Override
    public void rejoinElection() {
        this.elector.quitElection(false);
        this.elector.joinElection(this.localActiveNodeInfo);
    }

    @Override
    public String getZookeeperConnectionState() {
        return this.elector.getHAZookeeperConnectionState();
    }
}

