/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.List;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.shaded.org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.ZKFederationStateStoreOpDurations;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperFederationStateStore
implements FederationStateStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
    private static final String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
    private static final String ROOT_ZNODE_NAME_APPLICATION = "applications";
    private static final String ROOT_ZNODE_NAME_POLICY = "policies";
    private static final String ROOT_ZNODE_NAME_RESERVATION = "reservation";
    private ZKCuratorManager zkManager;
    private String baseZNode;
    private String appsZNode;
    private String membershipZNode;
    private String policiesZNode;
    private String reservationsZNode;
    private int maxAppsInStateStore;
    private volatile Clock clock = SystemClock.getInstance();
    @VisibleForTesting
    private ZKFederationStateStoreOpDurations opDurations = ZKFederationStateStoreOpDurations.getInstance();

    @Override
    public void init(Configuration conf) throws YarnException {
        LOG.info("Initializing ZooKeeper connection");
        this.maxAppsInStateStore = conf.getInt("yarn.federation.state-store.max-applications", 1000);
        this.baseZNode = conf.get("yarn.federation.zk-state-store.parent-path", "/federationstore");
        try {
            this.zkManager = new ZKCuratorManager(conf);
            this.zkManager.start();
        }
        catch (IOException e) {
            LOG.error("Cannot initialize the ZK connection", (Throwable)e);
        }
        this.membershipZNode = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)ROOT_ZNODE_NAME_MEMBERSHIP);
        this.appsZNode = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)ROOT_ZNODE_NAME_APPLICATION);
        this.policiesZNode = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)ROOT_ZNODE_NAME_POLICY);
        this.reservationsZNode = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)ROOT_ZNODE_NAME_RESERVATION);
        try {
            List zkAcl = ZKCuratorManager.getZKAcls((Configuration)conf);
            this.zkManager.createRootDirRecursively(this.membershipZNode, zkAcl);
            this.zkManager.createRootDirRecursively(this.appsZNode, zkAcl);
            this.zkManager.createRootDirRecursively(this.policiesZNode, zkAcl);
            this.zkManager.createRootDirRecursively(this.reservationsZNode, zkAcl);
        }
        catch (Exception e) {
            String errMsg = "Cannot create base directories: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.zkManager != null) {
            this.zkManager.close();
        }
    }

    @Override
    public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest request) throws YarnException {
        String errMsg;
        long start = this.clock.getTime();
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
        ApplicationId appId = app.getApplicationId();
        SubClusterId homeSubCluster = app.getHomeSubCluster();
        try {
            this.putApp(appId, homeSubCluster, false);
        }
        catch (Exception e) {
            errMsg = "Cannot add application home subcluster for " + appId;
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        try {
            homeSubCluster = this.getApp(appId);
        }
        catch (Exception e) {
            errMsg = "Cannot check app home subcluster for " + appId;
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addAppHomeSubClusterDuration(start, end);
        return AddApplicationHomeSubClusterResponse.newInstance(homeSubCluster);
    }

    @Override
    public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(UpdateApplicationHomeSubClusterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
        ApplicationId appId = app.getApplicationId();
        SubClusterId homeSubCluster = this.getApp(appId);
        if (homeSubCluster == null) {
            String errMsg = "Application " + appId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        SubClusterId newSubClusterId = request.getApplicationHomeSubCluster().getHomeSubCluster();
        this.putApp(appId, newSubClusterId, true);
        long end = this.clock.getTime();
        this.opDurations.addUpdateAppHomeSubClusterDuration(start, end);
        return UpdateApplicationHomeSubClusterResponse.newInstance();
    }

    @Override
    public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(GetApplicationHomeSubClusterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        ApplicationId appId = request.getApplicationId();
        SubClusterId homeSubCluster = this.getApp(appId);
        if (homeSubCluster == null) {
            String errMsg = "Application " + appId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addGetAppHomeSubClusterDuration(start, end);
        return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster);
    }

    @Override
    public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(GetApplicationsHomeSubClusterRequest request) throws YarnException {
        if (request == null) {
            throw new YarnException("Missing getApplicationsHomeSubCluster request");
        }
        try {
            long start = this.clock.getTime();
            SubClusterId requestSC = request.getSubClusterId();
            List children = this.zkManager.getChildren(this.appsZNode);
            List<ApplicationHomeSubCluster> result = children.stream().map(child -> this.generateAppHomeSC((String)child)).sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()).filter(appHomeSC -> FederationStateStoreUtils.filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())).limit(this.maxAppsInStateStore).collect(Collectors.toList());
            long end = this.clock.getTime();
            this.opDurations.addGetAppsHomeSubClusterDuration(start, end);
            LOG.info("filterSubClusterId = {}, appCount = {}.", (Object)requestSC, (Object)result.size());
            return GetApplicationsHomeSubClusterResponse.newInstance(result);
        }
        catch (Exception e) {
            String errMsg = "Cannot get apps: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            throw new YarnException("Cannot get app by request");
        }
    }

    private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
        try {
            ApplicationId applicationId = ApplicationId.fromString((String)appId);
            SubClusterId homeSubCluster = this.getApp(applicationId);
            ApplicationHomeSubCluster app = ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster);
            return app;
        }
        catch (Exception ex) {
            LOG.error("get homeSubCluster by appId = {}.", (Object)appId);
            return null;
        }
    }

    @Override
    public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(DeleteApplicationHomeSubClusterRequest request) throws YarnException {
        String errMsg;
        long start = this.clock.getTime();
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        ApplicationId appId = request.getApplicationId();
        String appZNode = ZKCuratorManager.getNodePath((String)this.appsZNode, (String)appId.toString());
        boolean exists = false;
        try {
            exists = this.zkManager.exists(appZNode);
        }
        catch (Exception e) {
            errMsg = "Cannot check app: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        if (!exists) {
            String errMsg2 = "Application " + appId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg2);
        }
        try {
            this.zkManager.delete(appZNode);
        }
        catch (Exception e) {
            errMsg = "Cannot delete app: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addDeleteAppHomeSubClusterDuration(start, end);
        return DeleteApplicationHomeSubClusterResponse.newInstance();
    }

    @Override
    public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationMembershipStateStoreInputValidator.validate(request);
        SubClusterInfo subClusterInfo = request.getSubClusterInfo();
        SubClusterId subclusterId = subClusterInfo.getSubClusterId();
        long currentTime = ZookeeperFederationStateStore.getCurrentTime();
        subClusterInfo.setLastHeartBeat(currentTime);
        try {
            this.putSubclusterInfo(subclusterId, subClusterInfo, true);
        }
        catch (Exception e) {
            String errMsg = "Cannot register subcluster: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addRegisterSubClusterDuration(start, end);
        return SubClusterRegisterResponse.newInstance();
    }

    @Override
    public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationMembershipStateStoreInputValidator.validate(request);
        SubClusterId subClusterId = request.getSubClusterId();
        SubClusterState state = request.getState();
        SubClusterInfo subClusterInfo = this.getSubclusterInfo(subClusterId);
        if (subClusterInfo == null) {
            String errMsg = "SubCluster " + subClusterId + " not found";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        } else {
            subClusterInfo.setState(state);
            this.putSubclusterInfo(subClusterId, subClusterInfo, true);
        }
        long end = this.clock.getTime();
        this.opDurations.addDeregisterSubClusterDuration(start, end);
        return SubClusterDeregisterResponse.newInstance();
    }

    @Override
    public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationMembershipStateStoreInputValidator.validate(request);
        SubClusterId subClusterId = request.getSubClusterId();
        SubClusterInfo subClusterInfo = this.getSubclusterInfo(subClusterId);
        if (subClusterInfo == null) {
            String errMsg = "SubCluster " + subClusterId + " does not exist; cannot heartbeat";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long currentTime = ZookeeperFederationStateStore.getCurrentTime();
        subClusterInfo.setLastHeartBeat(currentTime);
        subClusterInfo.setState(request.getState());
        subClusterInfo.setCapability(request.getCapability());
        this.putSubclusterInfo(subClusterId, subClusterInfo, true);
        long end = this.clock.getTime();
        this.opDurations.addSubClusterHeartbeatDuration(start, end);
        return SubClusterHeartbeatResponse.newInstance();
    }

    @Override
    public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationMembershipStateStoreInputValidator.validate(request);
        SubClusterId subClusterId = request.getSubClusterId();
        SubClusterInfo subClusterInfo = null;
        try {
            subClusterInfo = this.getSubclusterInfo(subClusterId);
            if (subClusterInfo == null) {
                LOG.warn("The queried SubCluster: {} does not exist.", (Object)subClusterId);
                return null;
            }
        }
        catch (Exception e) {
            String errMsg = "Cannot get subcluster: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addGetSubClusterDuration(start, end);
        return GetSubClusterInfoResponse.newInstance(subClusterInfo);
    }

    @Override
    public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request) throws YarnException {
        long start = this.clock.getTime();
        ArrayList<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
        try {
            for (String child : this.zkManager.getChildren(this.membershipZNode)) {
                SubClusterId subClusterId = SubClusterId.newInstance(child);
                SubClusterInfo info = this.getSubclusterInfo(subClusterId);
                if (request.getFilterInactiveSubClusters() && !info.getState().isActive()) continue;
                result.add(info);
            }
        }
        catch (Exception e) {
            String errMsg = "Cannot get subclusters: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addGetSubClustersDuration(start, end);
        return GetSubClustersInfoResponse.newInstance(result);
    }

    @Override
    public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(GetSubClusterPolicyConfigurationRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationPolicyStoreInputValidator.validate(request);
        String queue = request.getQueue();
        SubClusterPolicyConfiguration policy = null;
        try {
            policy = this.getPolicy(queue);
        }
        catch (Exception e) {
            String errMsg = "Cannot get policy: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        if (policy == null) {
            LOG.warn("Policy for queue: {} does not exist.", (Object)queue);
            return null;
        }
        long end = this.clock.getTime();
        this.opDurations.addGetPolicyConfigurationDuration(start, end);
        return GetSubClusterPolicyConfigurationResponse.newInstance(policy);
    }

    @Override
    public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationPolicyStoreInputValidator.validate(request);
        SubClusterPolicyConfiguration policy = request.getPolicyConfiguration();
        try {
            String queue = policy.getQueue();
            this.putPolicy(queue, policy, true);
        }
        catch (Exception e) {
            String errMsg = "Cannot set policy: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addSetPolicyConfigurationDuration(start, end);
        return SetSubClusterPolicyConfigurationResponse.newInstance();
    }

    @Override
    public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
        long start = this.clock.getTime();
        ArrayList<SubClusterPolicyConfiguration> result = new ArrayList<SubClusterPolicyConfiguration>();
        try {
            for (String child : this.zkManager.getChildren(this.policiesZNode)) {
                SubClusterPolicyConfiguration policy = this.getPolicy(child);
                if (policy == null) {
                    LOG.warn("Policy for queue: {} does not exist.", (Object)child);
                    continue;
                }
                result.add(policy);
            }
        }
        catch (Exception e) {
            String errMsg = "Cannot get policies: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addGetPoliciesConfigurationsDuration(start, end);
        return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
    }

    @Override
    public Version getCurrentVersion() {
        return null;
    }

    @Override
    public Version loadVersion() {
        return null;
    }

    private SubClusterId getApp(ApplicationId appId) throws YarnException {
        String appZNode = ZKCuratorManager.getNodePath((String)this.appsZNode, (String)appId.toString());
        SubClusterIdPBImpl subClusterId = null;
        byte[] data = this.get(appZNode);
        if (data != null) {
            try {
                subClusterId = new SubClusterIdPBImpl(YarnServerFederationProtos.SubClusterIdProto.parseFrom(data));
            }
            catch (InvalidProtocolBufferException e) {
                String errMsg = "Cannot parse application at " + appZNode;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
        }
        return subClusterId;
    }

    private void putApp(ApplicationId appId, SubClusterId subClusterId, boolean update) throws YarnException {
        String appZNode = ZKCuratorManager.getNodePath((String)this.appsZNode, (String)appId.toString());
        YarnServerFederationProtos.SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
        byte[] data = proto.toByteArray();
        this.put(appZNode, data, update);
    }

    private SubClusterInfo getSubclusterInfo(SubClusterId subclusterId) throws YarnException {
        String memberZNode = ZKCuratorManager.getNodePath((String)this.membershipZNode, (String)subclusterId.toString());
        SubClusterInfoPBImpl policy = null;
        byte[] data = this.get(memberZNode);
        if (data != null) {
            try {
                policy = new SubClusterInfoPBImpl(YarnServerFederationProtos.SubClusterInfoProto.parseFrom(data));
            }
            catch (InvalidProtocolBufferException e) {
                String errMsg = "Cannot parse subcluster info at " + memberZNode;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
        }
        return policy;
    }

    private void putSubclusterInfo(SubClusterId subclusterId, SubClusterInfo subClusterInfo, boolean update) throws YarnException {
        String memberZNode = ZKCuratorManager.getNodePath((String)this.membershipZNode, (String)subclusterId.toString());
        YarnServerFederationProtos.SubClusterInfoProto proto = ((SubClusterInfoPBImpl)subClusterInfo).getProto();
        byte[] data = proto.toByteArray();
        this.put(memberZNode, data, update);
    }

    private SubClusterPolicyConfiguration getPolicy(String queue) throws YarnException {
        String policyZNode = ZKCuratorManager.getNodePath((String)this.policiesZNode, (String)queue);
        SubClusterPolicyConfigurationPBImpl policy = null;
        byte[] data = this.get(policyZNode);
        if (data != null) {
            try {
                policy = new SubClusterPolicyConfigurationPBImpl(YarnServerFederationProtos.SubClusterPolicyConfigurationProto.parseFrom(data));
            }
            catch (InvalidProtocolBufferException e) {
                String errMsg = "Cannot parse policy at " + policyZNode;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
        }
        return policy;
    }

    private void putPolicy(String queue, SubClusterPolicyConfiguration policy, boolean update) throws YarnException {
        String policyZNode = ZKCuratorManager.getNodePath((String)this.policiesZNode, (String)queue);
        YarnServerFederationProtos.SubClusterPolicyConfigurationProto proto = ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
        byte[] data = proto.toByteArray();
        this.put(policyZNode, data, update);
    }

    private byte[] get(String znode) throws YarnException {
        boolean exists = false;
        try {
            exists = this.zkManager.exists(znode);
        }
        catch (Exception e) {
            String errMsg = "Cannot find znode " + znode;
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        if (!exists) {
            LOG.error("{} does not exist", (Object)znode);
            return null;
        }
        byte[] data = null;
        try {
            data = this.zkManager.getData(znode);
        }
        catch (Exception e) {
            String errMsg = "Cannot get data from znode " + znode + ": " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        return data;
    }

    private void put(String znode, byte[] data, boolean update) throws YarnException {
        String errMsg;
        boolean created = false;
        try {
            created = this.zkManager.create(znode);
        }
        catch (Exception e) {
            errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        if (!created) {
            LOG.debug("{} not created", (Object)znode);
            if (!update) {
                LOG.info("{} already existed and we are not updating", (Object)znode);
                return;
            }
        }
        try {
            this.zkManager.setData(znode, data, -1);
        }
        catch (Exception e) {
            errMsg = "Cannot write data into znode " + znode + ": " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
    }

    private static long getCurrentTime() {
        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        return cal.getTimeInMillis();
    }

    private void putReservation(ReservationId reservationId, SubClusterId subClusterId, boolean update) throws YarnException {
        String reservationZNode = ZKCuratorManager.getNodePath((String)this.reservationsZNode, (String)reservationId.toString());
        YarnServerFederationProtos.SubClusterIdProto proto = ((SubClusterIdPBImpl)subClusterId).getProto();
        byte[] data = proto.toByteArray();
        this.put(reservationZNode, data, update);
    }

    private SubClusterId getReservation(ReservationId reservationId) throws YarnException {
        String reservationIdZNode = ZKCuratorManager.getNodePath((String)this.reservationsZNode, (String)reservationId.toString());
        SubClusterIdPBImpl subClusterId = null;
        byte[] data = this.get(reservationIdZNode);
        if (data != null) {
            try {
                subClusterId = new SubClusterIdPBImpl(YarnServerFederationProtos.SubClusterIdProto.parseFrom(data));
            }
            catch (InvalidProtocolBufferException e) {
                String errMsg = "Cannot parse reservation at " + reservationId;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
        }
        return subClusterId;
    }

    @VisibleForTesting
    public ZKFederationStateStoreOpDurations getOpDurations() {
        return this.opDurations;
    }

    @Override
    public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(AddReservationHomeSubClusterRequest request) throws YarnException {
        String errMsg;
        long start = this.clock.getTime();
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId homeSubCluster = reservationHomeSubCluster.getHomeSubCluster();
        try {
            this.putReservation(reservationId, homeSubCluster, false);
        }
        catch (Exception e) {
            errMsg = "Cannot add reservation home subcluster for " + reservationId;
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        try {
            homeSubCluster = this.getReservation(reservationId);
        }
        catch (Exception e) {
            errMsg = "Cannot check app home subcluster for " + reservationId;
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addReservationHomeSubClusterDuration(start, end);
        return AddReservationHomeSubClusterResponse.newInstance(homeSubCluster);
    }

    @Override
    public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(GetReservationHomeSubClusterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        ReservationId reservationId = request.getReservationId();
        SubClusterId homeSubCluster = this.getReservation(reservationId);
        if (homeSubCluster == null) {
            String errMsg = "Reservation " + reservationId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        ReservationHomeSubCluster reservationHomeSubCluster = ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
        long end = this.clock.getTime();
        this.opDurations.addGetReservationHomeSubClusterDuration(start, end);
        return GetReservationHomeSubClusterResponse.newInstance(reservationHomeSubCluster);
    }

    @Override
    public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(GetReservationsHomeSubClusterRequest request) throws YarnException {
        long start = this.clock.getTime();
        ArrayList<ReservationHomeSubCluster> result = new ArrayList<ReservationHomeSubCluster>();
        try {
            for (String child : this.zkManager.getChildren(this.reservationsZNode)) {
                ReservationId reservationId = ReservationId.parseReservationId((String)child);
                SubClusterId homeSubCluster = this.getReservation(reservationId);
                ReservationHomeSubCluster app = ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
                result.add(app);
            }
        }
        catch (Exception e) {
            String errMsg = "Cannot get apps: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addGetReservationsHomeSubClusterDuration(start, end);
        return GetReservationsHomeSubClusterResponse.newInstance(result);
    }

    @Override
    public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(DeleteReservationHomeSubClusterRequest request) throws YarnException {
        String errMsg;
        long start = this.clock.getTime();
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        ReservationId reservationId = request.getReservationId();
        String reservationZNode = ZKCuratorManager.getNodePath((String)this.reservationsZNode, (String)reservationId.toString());
        boolean exists = false;
        try {
            exists = this.zkManager.exists(reservationZNode);
        }
        catch (Exception e) {
            errMsg = "Cannot check reservation: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        if (!exists) {
            String errMsg2 = "Reservation " + reservationId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg2);
        }
        try {
            this.zkManager.delete(reservationZNode);
        }
        catch (Exception e) {
            errMsg = "Cannot delete reservation: " + e.getMessage();
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        long end = this.clock.getTime();
        this.opDurations.addDeleteReservationHomeSubClusterDuration(start, end);
        return DeleteReservationHomeSubClusterResponse.newInstance();
    }

    @Override
    public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(UpdateReservationHomeSubClusterRequest request) throws YarnException {
        long start = this.clock.getTime();
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId homeSubCluster = this.getReservation(reservationId);
        if (homeSubCluster == null) {
            String errMsg = "Reservation " + reservationId + " does not exist";
            FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
        }
        SubClusterId newSubClusterId = reservationHomeSubCluster.getHomeSubCluster();
        this.putReservation(reservationId, newSubClusterId, true);
        long end = this.clock.getTime();
        this.opDurations.addUpdateReservationHomeSubClusterDuration(start, end);
        return UpdateReservationHomeSubClusterResponse.newInstance();
    }

    @Override
    public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }
}

