/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.federation.store.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.TimeZone;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.shaded.com.zaxxer.hikari.HikariDataSource;
import org.apache.hadoop.shaded.org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQLFederationStateStore
implements FederationStateStore {
    public static final Logger LOG = LoggerFactory.getLogger(SQLFederationStateStore.class);
    private static final String CALL_SP_REGISTER_SUBCLUSTER = "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
    private static final String CALL_SP_DEREGISTER_SUBCLUSTER = "{call sp_deregisterSubCluster(?, ?, ?)}";
    private static final String CALL_SP_GET_SUBCLUSTER = "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
    private static final String CALL_SP_GET_SUBCLUSTERS = "{call sp_getSubClusters()}";
    private static final String CALL_SP_SUBCLUSTER_HEARTBEAT = "{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
    private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER = "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}";
    private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER = "{call sp_updateApplicationHomeSubCluster(?, ?, ?)}";
    private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER = "{call sp_deleteApplicationHomeSubCluster(?, ?)}";
    private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER = "{call sp_getApplicationHomeSubCluster(?, ?)}";
    private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = "{call sp_getApplicationsHomeSubCluster(?, ?)}";
    private static final String CALL_SP_SET_POLICY_CONFIGURATION = "{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
    private static final String CALL_SP_GET_POLICY_CONFIGURATION = "{call sp_getPolicyConfiguration(?, ?, ?)}";
    private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = "{call sp_getPoliciesConfigurations()}";
    protected static final String CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER = "{call sp_addReservationHomeSubCluster(?, ?, ?, ?)}";
    protected static final String CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER = "{call sp_getReservationHomeSubCluster(?, ?)}";
    protected static final String CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER = "{call sp_getReservationsHomeSubCluster()}";
    protected static final String CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER = "{call sp_deleteReservationHomeSubCluster(?, ?)}";
    protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER = "{call sp_updateReservationHomeSubCluster(?, ?, ?)}";
    private Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    private String userName;
    private String password;
    private String driverClass;
    private String url;
    private int maximumPoolSize;
    private HikariDataSource dataSource = null;
    private final Clock clock = new MonotonicClock();
    @VisibleForTesting
    Connection conn = null;
    private int maxAppsInStateStore;

    @Override
    public void init(Configuration conf) throws YarnException {
        this.driverClass = conf.get("yarn.federation.state-store.sql.jdbc-class", "org.apache.hadoop.shaded.org.hsqldb.jdbc.JDBCDataSource");
        this.maximumPoolSize = conf.getInt("yarn.federation.state-store.sql.max-connections", 1);
        this.userName = conf.get("yarn.federation.state-store.sql.username");
        this.password = conf.get("yarn.federation.state-store.sql.password");
        this.url = conf.get("yarn.federation.state-store.sql.url");
        try {
            Class.forName(this.driverClass);
        }
        catch (ClassNotFoundException e) {
            FederationStateStoreUtils.logAndThrowException(LOG, "Driver class not found.", e);
        }
        this.dataSource = new HikariDataSource();
        this.dataSource.setDataSourceClassName(this.driverClass);
        FederationStateStoreUtils.setUsername(this.dataSource, this.userName);
        FederationStateStoreUtils.setPassword(this.dataSource, this.password);
        FederationStateStoreUtils.setProperty(this.dataSource, "url", this.url);
        this.dataSource.setMaximumPoolSize(this.maximumPoolSize);
        LOG.info("Initialized connection pool to the Federation StateStore database at address: " + this.url);
        try {
            this.conn = this.getConnection();
            LOG.debug("Connection created");
        }
        catch (SQLException e) {
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Not able to get Connection", e);
        }
        this.maxAppsInStateStore = conf.getInt("yarn.federation.state-store.max-applications", 1000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest registerSubClusterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(registerSubClusterRequest);
        CallableStatement cstmt = null;
        SubClusterInfo subClusterInfo = registerSubClusterRequest.getSubClusterInfo();
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
            cstmt.setString(1, subClusterId.getId());
            cstmt.setString(2, subClusterInfo.getAMRMServiceAddress());
            cstmt.setString(3, subClusterInfo.getClientRMServiceAddress());
            cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress());
            cstmt.setString(5, subClusterInfo.getRMWebServiceAddress());
            cstmt.setString(6, subClusterInfo.getState().toString());
            cstmt.setLong(7, subClusterInfo.getLastStartTime());
            cstmt.setString(8, subClusterInfo.getCapability());
            cstmt.registerOutParameter(9, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(9) == 0) {
                errMsg = "SubCluster " + subClusterId + " was not registered into the StateStore";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(9) != 1) {
                errMsg = "Wrong behavior during registration of SubCluster " + subClusterId + " into the StateStore";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Registered the SubCluster " + subClusterId + " into the StateStore");
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to register the SubCluster " + subClusterId + " into the StateStore", e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterRegisterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest subClusterDeregisterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterDeregisterRequest);
        CallableStatement cstmt = null;
        SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
        SubClusterState state = subClusterDeregisterRequest.getState();
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
            cstmt.setString(1, subClusterId.getId());
            cstmt.setString(2, state.toString());
            cstmt.registerOutParameter(3, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(3) == 0) {
                errMsg = "SubCluster " + subClusterId + " not found";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(3) != 1) {
                errMsg = "Wrong behavior during deregistration of SubCluster " + subClusterId + " from the StateStore";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Deregistered the SubCluster " + subClusterId + " state to " + state.toString());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to deregister the sub-cluster " + subClusterId + " state to " + state.toString(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterDeregisterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest subClusterHeartbeatRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterHeartbeatRequest);
        CallableStatement cstmt = null;
        SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
        SubClusterState state = subClusterHeartbeatRequest.getState();
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
            cstmt.setString(1, subClusterId.getId());
            cstmt.setString(2, state.toString());
            cstmt.setString(3, subClusterHeartbeatRequest.getCapability());
            cstmt.registerOutParameter(4, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(4) == 0) {
                errMsg = "SubCluster " + subClusterId.toString() + " does not exist; cannot heartbeat";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(4) != 1) {
                errMsg = "Wrong behavior during the heartbeat of SubCluster " + subClusterId;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Heartbeated the StateStore for the specified SubCluster " + subClusterId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to heartbeat the StateStore for the specified SubCluster " + subClusterId, e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterHeartbeatResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest subClusterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
        CallableStatement cstmt = null;
        SubClusterInfo subClusterInfo = null;
        SubClusterId subClusterId = subClusterRequest.getSubClusterId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_SUBCLUSTER);
            cstmt.setString(1, subClusterId.getId());
            cstmt.registerOutParameter(2, 12);
            cstmt.registerOutParameter(3, 12);
            cstmt.registerOutParameter(4, 12);
            cstmt.registerOutParameter(5, 12);
            cstmt.registerOutParameter(6, 93);
            cstmt.registerOutParameter(7, 12);
            cstmt.registerOutParameter(8, -5);
            cstmt.registerOutParameter(9, 12);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            String amRMAddress = cstmt.getString(2);
            String clientRMAddress = cstmt.getString(3);
            String rmAdminAddress = cstmt.getString(4);
            String webAppAddress = cstmt.getString(5);
            if (amRMAddress == null || clientRMAddress == null) {
                LOG.warn("The queried SubCluster: {} does not exist.", (Object)subClusterId);
                GetSubClusterInfoResponse getSubClusterInfoResponse = null;
                return getSubClusterInfoResponse;
            }
            Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, this.utcCalendar);
            long lastHeartBeat = heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0L;
            SubClusterState state = SubClusterState.fromString(cstmt.getString(7));
            long lastStartTime = cstmt.getLong(8);
            String capability = cstmt.getString(9);
            subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            try {
                FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
            }
            catch (FederationStateStoreInvalidInputException e) {
                String errMsg = "SubCluster " + subClusterId.toString() + " does not exist";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.debug("Got the information about the specified SubCluster {}", (Object)subClusterInfo);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the SubCluster information for " + subClusterId, e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetSubClusterInfoResponse.newInstance(subClusterInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest subClustersRequest) throws YarnException {
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String subClusterName = rs.getString(1);
                String amRMAddress = rs.getString(2);
                String clientRMAddress = rs.getString(3);
                String rmAdminAddress = rs.getString(4);
                String webAppAddress = rs.getString(5);
                long lastHeartBeat = rs.getTimestamp(6, this.utcCalendar).getTime();
                SubClusterState state = SubClusterState.fromString(rs.getString(7));
                long lastStartTime = rs.getLong(8);
                String capability = rs.getString(9);
                SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
                SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability);
                FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
                try {
                    FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
                }
                catch (FederationStateStoreInvalidInputException e) {
                    String errMsg = "SubCluster " + subClusterId.toString() + " is not valid";
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
                }
                if (subClustersRequest.getFilterInactiveSubClusters() && !subClusterInfo.getState().isActive()) continue;
                subClusters.add(subClusterInfo);
            }
        }
        catch (SQLException e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the SubClusters ", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return GetSubClustersInfoResponse.newInstance(subClusters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        String subClusterHome = null;
        ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId();
        SubClusterId subClusterId = request.getApplicationHomeSubCluster().getHomeSubCluster();
        try {
            cstmt = this.getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString(1, appId.toString());
            cstmt.setString(2, subClusterId.getId());
            cstmt.registerOutParameter(3, 12);
            cstmt.registerOutParameter(4, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            subClusterHome = cstmt.getString(3);
            SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            if (subClusterId.equals(subClusterIdHome)) {
                if (cstmt.getInt(4) == 0) {
                    LOG.info("The application {} was not inserted in the StateStore because it was already present in SubCluster {}", (Object)appId, (Object)subClusterHome);
                } else if (cstmt.getInt(4) != 1) {
                    String errMsg = "Wrong behavior during the insertion of SubCluster " + subClusterId;
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
                }
                LOG.info("Insert into the StateStore the application: " + appId + " in SubCluster:  " + subClusterHome);
            } else {
                if (cstmt.getInt(4) != 0) {
                    String errMsg = "The application " + appId + " does exist but was overwritten";
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
                }
                LOG.info("Application: " + appId + " already present with SubCluster:  " + subClusterHome);
            }
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to insert the newly generated application " + request.getApplicationHomeSubCluster().getApplicationId(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return AddApplicationHomeSubClusterResponse.newInstance(SubClusterId.newInstance(subClusterHome));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(UpdateApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId();
        SubClusterId subClusterId = request.getApplicationHomeSubCluster().getHomeSubCluster();
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString(1, appId.toString());
            cstmt.setString(2, subClusterId.getId());
            cstmt.registerOutParameter(3, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(3) == 0) {
                errMsg = "Application " + appId + " does not exist";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(3) != 1) {
                errMsg = "Wrong behavior during the update of SubCluster " + subClusterId;
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Update the SubCluster to {} for application {} in the StateStore", (Object)subClusterId, (Object)appId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to update the application " + request.getApplicationHomeSubCluster().getApplicationId(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return UpdateApplicationHomeSubClusterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(GetApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterId homeRM = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString(1, request.getApplicationId().toString());
            cstmt.registerOutParameter(2, 12);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            if (cstmt.getString(2) != null) {
                homeRM = SubClusterId.newInstance(cstmt.getString(2));
            } else {
                String errMsg = "Application " + request.getApplicationId() + " does not exist";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.debug("Got the information about the specified application {}. The AM is running in {}", (Object)request.getApplicationId(), (Object)homeRM);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the application information for the specified application " + request.getApplicationId(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetApplicationHomeSubClusterResponse.newInstance(request.getApplicationId(), homeRM);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(GetApplicationsHomeSubClusterRequest request) throws YarnException {
        if (request == null) {
            throw new YarnException("Missing getApplicationsHomeSubCluster request");
        }
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<ApplicationHomeSubCluster>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
            cstmt.setInt("limit_IN", this.maxAppsInStateStore);
            String homeSubClusterIN = "";
            SubClusterId subClusterId = request.getSubClusterId();
            if (subClusterId != null) {
                homeSubClusterIN = subClusterId.toString();
            }
            cstmt.setString("homeSubCluster_IN", homeSubClusterIN);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next() && appsHomeSubClusters.size() <= this.maxAppsInStateStore) {
                String applicationId = rs.getString("applicationId");
                String homeSubCluster = rs.getString("homeSubCluster");
                appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(ApplicationId.fromString((String)applicationId), SubClusterId.newInstance(homeSubCluster)));
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the applications ", e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(DeleteApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString(1, request.getApplicationId().toString());
            cstmt.registerOutParameter(2, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(2) == 0) {
                errMsg = "Application " + request.getApplicationId() + " does not exist";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(2) != 1) {
                errMsg = "Wrong behavior during deleting the application " + request.getApplicationId();
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Delete from the StateStore the application: {}", (Object)request.getApplicationId());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to delete the application " + request.getApplicationId(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return DeleteApplicationHomeSubClusterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(GetSubClusterPolicyConfigurationRequest request) throws YarnException {
        FederationPolicyStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
            cstmt.setString(1, request.getQueue());
            cstmt.registerOutParameter(2, 12);
            cstmt.registerOutParameter(3, -3);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getString(2) == null || cstmt.getBytes(3) == null) {
                LOG.warn("Policy for queue: {} does not exist.", (Object)request.getQueue());
                GetSubClusterPolicyConfigurationResponse getSubClusterPolicyConfigurationResponse = null;
                return getSubClusterPolicyConfigurationResponse;
            }
            subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance(request.getQueue(), cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3)));
            LOG.debug("Selected from StateStore the policy for the queue: {}", (Object)subClusterPolicyConfiguration);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to select the policy for the queue :" + request.getQueue(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetSubClusterPolicyConfigurationResponse.newInstance(subClusterPolicyConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest request) throws YarnException {
        FederationPolicyStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
        try {
            String errMsg;
            cstmt = this.getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
            cstmt.setString(1, policyConf.getQueue());
            cstmt.setString(2, policyConf.getType());
            cstmt.setBytes(3, SQLFederationStateStore.getByteArray(policyConf.getParams()));
            cstmt.registerOutParameter(4, 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            if (cstmt.getInt(4) == 0) {
                errMsg = "The policy " + policyConf.getQueue() + " was not insert into the StateStore";
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            if (cstmt.getInt(4) != 1) {
                errMsg = "Wrong behavior during insert the policy " + policyConf.getQueue();
                FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
            }
            LOG.info("Insert into the state store the policy for the queue: " + policyConf.getQueue());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to insert the newly generated policy for the queue :" + policyConf.getQueue(), e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SetSubClusterPolicyConfigurationResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<SubClusterPolicyConfiguration> policyConfigurations = new ArrayList<SubClusterPolicyConfiguration>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String queue = rs.getString(1);
                String type = rs.getString(2);
                byte[] policyInfo = rs.getBytes(3);
                SubClusterPolicyConfiguration subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance(queue, type, ByteBuffer.wrap(policyInfo));
                policyConfigurations.add(subClusterPolicyConfiguration);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the policy information for all the queues.", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations);
    }

    @Override
    public Version getCurrentVersion() {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public Version loadVersion() {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public void close() throws Exception {
        if (this.dataSource != null) {
            this.dataSource.close();
            LOG.debug("Connection closed");
            FederationStateStoreClientMetrics.decrConnections();
        }
    }

    @VisibleForTesting
    protected Connection getConnection() throws SQLException {
        FederationStateStoreClientMetrics.incrConnections();
        return this.dataSource.getConnection();
    }

    @VisibleForTesting
    protected CallableStatement getCallableStatement(String procedure) throws SQLException {
        return this.conn.prepareCall(procedure);
    }

    private static byte[] getByteArray(ByteBuffer bb) {
        byte[] ba = new byte[bb.limit()];
        bb.get(ba);
        return ba;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(AddReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
        SubClusterId subClusterHomeId = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            cstmt.registerOutParameter("storedHomeSubCluster_OUT", 12);
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            String subClusterHomeIdString = cstmt.getString("storedHomeSubCluster_OUT");
            subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (subClusterId.equals(subClusterHomeId)) {
                if (rowCount == 0) {
                    LOG.info("The reservation {} was not inserted in the StateStore because it was already present in subCluster {}", (Object)reservationId, (Object)subClusterHomeId);
                } else if (rowCount != 1) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the insertion of subCluster %s according to reservation %s. The database expects to insert 1 record, but the number of inserted changes is greater than 1, please check the records of the database.", subClusterId, reservationId);
                }
            } else {
                if (rowCount != 0) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "The reservation %s does exist but was overwritten.", reservationId);
                }
                LOG.info("Reservation: {} already present with subCluster: {}.", (Object)reservationId, (Object)subClusterHomeId);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly generated reservation %s to subCluster %s.", reservationId, subClusterId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(GetReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationId reservationId = request.getReservationId();
        SubClusterId subClusterId = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.registerOutParameter("homeSubCluster_OUT", 12);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
            if (StringUtils.isNotBlank((CharSequence)subClusterHomeIdString)) {
                subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
            } else {
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Reservation %s does not exist", reservationId);
            }
            LOG.info("Got the information about the specified reservation {} in subCluster = {}.", (Object)reservationId, (Object)subClusterId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            ReservationHomeSubCluster homeSubCluster = ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
            GetReservationHomeSubClusterResponse getReservationHomeSubClusterResponse = GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
            return getReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the reservation information according to %s.", reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to obtain the reservation information according to " + reservationId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(GetReservationsHomeSubClusterRequest request) throws YarnException {
        GetReservationsHomeSubClusterResponse getReservationsHomeSubClusterResponse;
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<ReservationHomeSubCluster> reservationsHomeSubClusters = new ArrayList<ReservationHomeSubCluster>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String dbReservationId = rs.getString("reservationId");
                String dbHomeSubCluster = rs.getString("homeSubCluster");
                ReservationId reservationId = ReservationId.parseReservationId((String)dbReservationId);
                SubClusterId homeSubCluster = SubClusterId.newInstance(dbHomeSubCluster);
                ReservationHomeSubCluster reservationHomeSubCluster = ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
                reservationsHomeSubClusters.add(reservationHomeSubCluster);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            getReservationsHomeSubClusterResponse = GetReservationsHomeSubClusterResponse.newInstance(reservationsHomeSubClusters);
        }
        catch (Exception e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the reservations.", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return getReservationsHomeSubClusterResponse;
        throw new YarnException("Unable to obtain the information for all the reservations.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(DeleteReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationId reservationId = request.getReservationId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Reservation %s does not exist", reservationId);
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deleting the reservation %s. The database is expected to delete 1 record, but the number of deleted records returned by the database is greater than 1, indicating that a duplicate reservationId occurred during the deletion process.", reservationId);
            }
            LOG.info("Delete from the StateStore the reservation: {}.", (Object)reservationId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            DeleteReservationHomeSubClusterResponse deleteReservationHomeSubClusterResponse = DeleteReservationHomeSubClusterResponse.newInstance();
            return deleteReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to delete the reservation %s.", reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to delete the reservation " + reservationId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(UpdateReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
        try {
            cstmt = this.getCallableStatement(CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Reservation %s does not exist", reservationId);
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during update the subCluster %s according to reservation %s. The database is expected to update 1 record, but the number of database update records is greater than 1, the records of the database should be checked.", subClusterId, reservationId);
            }
            LOG.info("Update the subCluster to {} for reservation {} in the StateStore.", (Object)subClusterId, (Object)reservationId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            UpdateReservationHomeSubClusterResponse updateReservationHomeSubClusterResponse = UpdateReservationHomeSubClusterResponse.newInstance();
            return updateReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to update the subCluster %s according to reservation %s.", subClusterId, reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to update the subCluster " + subClusterId + " according to reservation" + reservationId);
    }

    @VisibleForTesting
    public Connection getConn() {
        return this.conn;
    }

    @Override
    public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }

    @Override
    public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        throw new NotImplementedException("Code is not implemented");
    }
}

