/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.federation.store.impl;

import com.zaxxer.hikari.HikariDataSource;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner;
import org.apache.hadoop.yarn.server.federation.store.sql.FederationSQLOutParameter;
import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler;
import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler;
import org.apache.hadoop.yarn.server.federation.store.sql.RowCountHandler;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQLFederationStateStore
implements FederationStateStore {
    public static final Logger LOG = LoggerFactory.getLogger(SQLFederationStateStore.class);
    private static final String CALL_SP_REGISTER_SUBCLUSTER = "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
    private static final String CALL_SP_DEREGISTER_SUBCLUSTER = "{call sp_deregisterSubCluster(?, ?, ?)}";
    private static final String CALL_SP_GET_SUBCLUSTER = "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
    private static final String CALL_SP_GET_SUBCLUSTERS = "{call sp_getSubClusters()}";
    private static final String CALL_SP_SUBCLUSTER_HEARTBEAT = "{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
    private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER = "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?, ?)}";
    private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER = "{call sp_updateApplicationHomeSubCluster(?, ?, ?, ?)}";
    private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER = "{call sp_deleteApplicationHomeSubCluster(?, ?)}";
    private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER = "{call sp_getApplicationHomeSubCluster(?, ?, ?, ?)}";
    private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = "{call sp_getApplicationsHomeSubCluster(?, ?)}";
    private static final String CALL_SP_SET_POLICY_CONFIGURATION = "{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
    private static final String CALL_SP_GET_POLICY_CONFIGURATION = "{call sp_getPolicyConfiguration(?, ?, ?)}";
    private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS = "{call sp_getPoliciesConfigurations()}";
    protected static final String CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER = "{call sp_addReservationHomeSubCluster(?, ?, ?, ?)}";
    protected static final String CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER = "{call sp_getReservationHomeSubCluster(?, ?)}";
    protected static final String CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER = "{call sp_getReservationsHomeSubCluster()}";
    protected static final String CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER = "{call sp_deleteReservationHomeSubCluster(?, ?)}";
    protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER = "{call sp_updateReservationHomeSubCluster(?, ?, ?)}";
    protected static final String CALL_SP_ADD_MASTERKEY = "{call sp_addMasterKey(?, ?, ?)}";
    protected static final String CALL_SP_GET_MASTERKEY = "{call sp_getMasterKey(?, ?)}";
    protected static final String CALL_SP_DELETE_MASTERKEY = "{call sp_deleteMasterKey(?, ?)}";
    protected static final String CALL_SP_ADD_DELEGATIONTOKEN = "{call sp_addDelegationToken(?, ?, ?, ?, ?)}";
    protected static final String CALL_SP_GET_DELEGATIONTOKEN = "{call sp_getDelegationToken(?, ?, ?, ?)}";
    protected static final String CALL_SP_UPDATE_DELEGATIONTOKEN = "{call sp_updateDelegationToken(?, ?, ?, ?, ?)}";
    protected static final String CALL_SP_DELETE_DELEGATIONTOKEN = "{call sp_deleteDelegationToken(?, ?)}";
    private static final String CALL_SP_STORE_VERSION = "{call sp_storeVersion(?, ?, ?)}";
    private static final String CALL_SP_LOAD_VERSION = "{call sp_getVersion(?, ?)}";
    private Calendar utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    private String userName;
    private String password;
    private String driverClass;
    private String url;
    private int maximumPoolSize;
    private HikariDataSource dataSource = null;
    private final Clock clock = new MonotonicClock();
    @VisibleForTesting
    private Connection conn = null;
    private int maxAppsInStateStore;
    private int minimumIdle;
    private String dataSourcePoolName;
    private long maxLifeTime;
    private long idleTimeout;
    private long connectionTimeout;
    protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1);

    @Override
    public void init(Configuration conf) throws YarnException {
        this.driverClass = conf.get("yarn.federation.state-store.sql.jdbc-class", "org.hsqldb.jdbc.JDBCDataSource");
        this.maximumPoolSize = conf.getInt("yarn.federation.state-store.sql.max-connections", 1);
        this.minimumIdle = conf.getInt("yarn.federation.state-store.sql.minimum-idle", 1);
        this.dataSourcePoolName = conf.get("yarn.federation.state-store.sql.pool-name", "YARN-Federation-DataBasePool");
        this.maxLifeTime = conf.getTimeDuration("yarn.federation.state-store.sql.max-life-time", YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CONN_MAX_LIFE_TIME, TimeUnit.MILLISECONDS);
        this.idleTimeout = conf.getTimeDuration("yarn.federation.state-store.sql.idle-time-out", YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CONN_IDLE_TIMEOUT_TIME, TimeUnit.MILLISECONDS);
        this.connectionTimeout = conf.getTimeDuration("yarn.federation.state-store.sql.conn-time-out", YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CONNECTION_TIMEOUT_TIME, TimeUnit.MILLISECONDS);
        this.userName = conf.get("yarn.federation.state-store.sql.username");
        this.password = conf.get("yarn.federation.state-store.sql.password");
        this.url = conf.get("yarn.federation.state-store.sql.url");
        try {
            Class.forName(this.driverClass);
        }
        catch (ClassNotFoundException e) {
            FederationStateStoreUtils.logAndThrowException(LOG, "Driver class not found.", e);
        }
        this.dataSource = new HikariDataSource();
        this.dataSource.setDataSourceClassName(this.driverClass);
        FederationStateStoreUtils.setUsername(this.dataSource, this.userName);
        FederationStateStoreUtils.setPassword(this.dataSource, this.password);
        FederationStateStoreUtils.setProperty(this.dataSource, "url", this.url);
        this.dataSource.setMaximumPoolSize(this.maximumPoolSize);
        this.dataSource.setPoolName(this.dataSourcePoolName);
        this.dataSource.setMinimumIdle(this.minimumIdle);
        this.dataSource.setMaxLifetime(this.maxLifeTime);
        this.dataSource.setIdleTimeout(this.idleTimeout);
        this.dataSource.setConnectionTimeout(this.connectionTimeout);
        LOG.info("Initialized connection pool to the Federation StateStore database at address: {}.", (Object)this.url);
        try {
            this.conn = this.getConnection();
            LOG.debug("Connection created");
        }
        catch (SQLException e) {
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Not able to get Connection", e);
        }
        this.maxAppsInStateStore = conf.getInt("yarn.federation.state-store.max-applications", 1000);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest registerSubClusterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(registerSubClusterRequest);
        CallableStatement cstmt = null;
        SubClusterInfo subClusterInfo = registerSubClusterRequest.getSubClusterInfo();
        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_REGISTER_SUBCLUSTER);
            cstmt.setString("subClusterId_IN", subClusterId.getId());
            cstmt.setString("amRMServiceAddress_IN", subClusterInfo.getAMRMServiceAddress());
            cstmt.setString("clientRMServiceAddress_IN", subClusterInfo.getClientRMServiceAddress());
            cstmt.setString("rmAdminServiceAddress_IN", subClusterInfo.getRMAdminServiceAddress());
            cstmt.setString("rmWebServiceAddress_IN", subClusterInfo.getRMWebServiceAddress());
            cstmt.setString("state_IN", subClusterInfo.getState().toString());
            cstmt.setLong("lastStartTime_IN", subClusterInfo.getLastStartTime());
            cstmt.setString("capability_IN", subClusterInfo.getCapability());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "SubCluster %s was not registered into the StateStore.", subClusterId);
            }
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during registration of SubCluster %s into the StateStore", subClusterId);
            }
            LOG.info("Registered the SubCluster {} into the StateStore.", (Object)subClusterId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to register the SubCluster %s into the StateStore.", subClusterId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterRegisterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest subClusterDeregisterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterDeregisterRequest);
        CallableStatement cstmt = null;
        SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
        SubClusterState state = subClusterDeregisterRequest.getState();
        try {
            cstmt = this.getCallableStatement(CALL_SP_DEREGISTER_SUBCLUSTER);
            cstmt.setString("subClusterId_IN", subClusterId.getId());
            cstmt.setString("state_IN", state.toString());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "SubCluster %s not found.", subClusterId);
            }
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deregistration of SubCluster %s from the StateStore.", subClusterId);
            }
            LOG.info("Deregistered the SubCluster {} state to {}.", (Object)subClusterId, (Object)state.toString());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to deregister the sub-cluster %s state to %s.", subClusterId, state.toString());
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterDeregisterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest subClusterHeartbeatRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterHeartbeatRequest);
        CallableStatement cstmt = null;
        SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
        SubClusterState state = subClusterHeartbeatRequest.getState();
        try {
            cstmt = this.getCallableStatement(CALL_SP_SUBCLUSTER_HEARTBEAT);
            cstmt.setString("subClusterId_IN", subClusterId.getId());
            cstmt.setString("state_IN", state.toString());
            cstmt.setString("capability_IN", subClusterHeartbeatRequest.getCapability());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "SubCluster %s does not exist; cannot heartbeat.", subClusterId);
            }
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the heartbeat of SubCluster %s.", subClusterId);
            }
            LOG.info("Heartbeated the StateStore for the specified SubCluster {}.", (Object)subClusterId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to heartbeat the StateStore for the specified SubCluster %s.", subClusterId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SubClusterHeartbeatResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest subClusterRequest) throws YarnException {
        FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
        CallableStatement cstmt = null;
        SubClusterInfo subClusterInfo = null;
        SubClusterId subClusterId = subClusterRequest.getSubClusterId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_SUBCLUSTER);
            cstmt.setString("subClusterId_IN", subClusterId.getId());
            cstmt.registerOutParameter("amRMServiceAddress_OUT", 12);
            cstmt.registerOutParameter("clientRMServiceAddress_OUT", 12);
            cstmt.registerOutParameter("rmAdminServiceAddress_OUT", 12);
            cstmt.registerOutParameter("rmWebServiceAddress_OUT", 12);
            cstmt.registerOutParameter("lastHeartBeat_OUT", 93);
            cstmt.registerOutParameter("state_OUT", 12);
            cstmt.registerOutParameter("lastStartTime_OUT", -5);
            cstmt.registerOutParameter("capability_OUT", 12);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            String amRMAddress = cstmt.getString("amRMServiceAddress_OUT");
            String clientRMAddress = cstmt.getString("clientRMServiceAddress_OUT");
            String rmAdminAddress = cstmt.getString("rmAdminServiceAddress_OUT");
            String webAppAddress = cstmt.getString("rmWebServiceAddress_OUT");
            if (amRMAddress == null || clientRMAddress == null) {
                LOG.warn("The queried SubCluster: {} does not exist.", (Object)subClusterId);
                GetSubClusterInfoResponse getSubClusterInfoResponse = null;
                return getSubClusterInfoResponse;
            }
            Timestamp heartBeatTimeStamp = cstmt.getTimestamp("lastHeartBeat_OUT", this.utcCalendar);
            long lastHeartBeat = heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0L;
            SubClusterState state = SubClusterState.fromString(cstmt.getString("state_OUT"));
            long lastStartTime = cstmt.getLong("lastStartTime_OUT");
            String capability = cstmt.getString("capability_OUT");
            subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            try {
                FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
            }
            catch (FederationStateStoreInvalidInputException e) {
                FederationStateStoreUtils.logAndThrowStoreException((Throwable)((Object)e), LOG, "SubCluster %s does not exist.", subClusterId);
            }
            LOG.debug("Got the information about the specified SubCluster {}", (Object)subClusterInfo);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the SubCluster information for %s.", subClusterId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetSubClusterInfoResponse.newInstance(subClusterInfo);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest subClustersRequest) throws YarnException {
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_SUBCLUSTERS);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String subClusterName = rs.getString("subClusterId");
                String amRMAddress = rs.getString("amRMServiceAddress");
                String clientRMAddress = rs.getString("clientRMServiceAddress");
                String rmAdminAddress = rs.getString("rmAdminServiceAddress");
                String webAppAddress = rs.getString("rmWebServiceAddress");
                long lastHeartBeat = rs.getTimestamp("lastHeartBeat", this.utcCalendar).getTime();
                SubClusterState state = SubClusterState.fromString(rs.getString("state"));
                long lastStartTime = rs.getLong("lastStartTime");
                String capability = rs.getString("capability");
                SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
                SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability);
                FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
                try {
                    FederationMembershipStateStoreInputValidator.checkSubClusterInfo(subClusterInfo);
                }
                catch (FederationStateStoreInvalidInputException e) {
                    FederationStateStoreUtils.logAndThrowStoreException((Throwable)((Object)e), LOG, "SubCluster %s is not valid.", subClusterId);
                }
                if (subClustersRequest.getFilterInactiveSubClusters() && !subClusterInfo.getState().isActive()) continue;
                subClusters.add(subClusterInfo);
            }
        }
        catch (SQLException e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the SubClusters ", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return GetSubClustersInfoResponse.newInstance(subClusters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        String subClusterHome = null;
        ApplicationHomeSubCluster applicationHomeSubCluster = request.getApplicationHomeSubCluster();
        ApplicationId appId = applicationHomeSubCluster.getApplicationId();
        SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
        ApplicationSubmissionContext appSubmissionContext = applicationHomeSubCluster.getApplicationSubmissionContext();
        try {
            cstmt = this.getCallableStatement(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString("applicationId_IN", appId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            if (appSubmissionContext != null) {
                cstmt.setBlob("applicationContext_IN", (InputStream)new ByteArrayInputStream(((ApplicationSubmissionContextPBImpl)appSubmissionContext).getProto().toByteArray()));
            } else {
                cstmt.setNull("applicationContext_IN", 2004);
            }
            cstmt.registerOutParameter("storedHomeSubCluster_OUT", 12);
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            subClusterHome = cstmt.getString("storedHomeSubCluster_OUT");
            SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (subClusterId.equals(subClusterIdHome)) {
                if (rowCount == 0) {
                    LOG.info("The application {} was not inserted in the StateStore because it was already present in SubCluster {}", (Object)appId, (Object)subClusterHome);
                } else if (cstmt.getInt("rowCount_OUT") != 1) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the insertion of SubCluster %s.", subClusterId);
                }
                LOG.info("Insert into the StateStore the application: {} in SubCluster: {}.", (Object)appId, (Object)subClusterHome);
            } else {
                if (rowCount != 0) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "The application %s does exist but was overwritten.", appId);
                }
                LOG.info("Application: {} already present with SubCluster: {}.", (Object)appId, (Object)subClusterHome);
            }
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly generated application %s.", appId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return AddApplicationHomeSubClusterResponse.newInstance(SubClusterId.newInstance(subClusterHome));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(UpdateApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ApplicationHomeSubCluster applicationHomeSubCluster = request.getApplicationHomeSubCluster();
        ApplicationId appId = applicationHomeSubCluster.getApplicationId();
        SubClusterId subClusterId = applicationHomeSubCluster.getHomeSubCluster();
        ApplicationSubmissionContext appSubmissionContext = applicationHomeSubCluster.getApplicationSubmissionContext();
        try {
            cstmt = this.getCallableStatement(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString("applicationId_IN", appId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            if (appSubmissionContext != null) {
                cstmt.setBlob("applicationContext_IN", (InputStream)new ByteArrayInputStream(((ApplicationSubmissionContextPBImpl)appSubmissionContext).getProto().toByteArray()));
            } else {
                cstmt.setNull("applicationContext_IN", 2004);
            }
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application %s does not exist.", appId);
            }
            if (cstmt.getInt("rowCount_OUT") != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the update of SubCluster %s.", subClusterId);
            }
            LOG.info("Update the SubCluster to {} for application {} in the StateStore", (Object)subClusterId, (Object)appId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to update the application %s.", appId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return UpdateApplicationHomeSubClusterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(GetApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterId homeRM = null;
        Long createTime = 0L;
        ApplicationId applicationId = request.getApplicationId();
        ApplicationSubmissionContextPBImpl appSubmissionContext = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString("applicationId_IN", applicationId.toString());
            cstmt.registerOutParameter("homeSubCluster_OUT", 12);
            cstmt.registerOutParameter("createTime_OUT", 93);
            cstmt.registerOutParameter("applicationContext_OUT", 2004);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            String homeSubCluster = cstmt.getString("homeSubCluster_OUT");
            if (homeSubCluster != null) {
                homeRM = SubClusterId.newInstance(homeSubCluster);
            } else {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application %s does not exist.", applicationId);
            }
            Timestamp createTimeStamp = cstmt.getTimestamp("createTime_OUT", this.utcCalendar);
            createTime = createTimeStamp != null ? createTimeStamp.getTime() : 0L;
            Blob blobAppContextData = cstmt.getBlob("applicationContext_OUT");
            if (blobAppContextData != null && request.getContainsAppSubmissionContext()) {
                appSubmissionContext = new ApplicationSubmissionContextPBImpl(YarnProtos.ApplicationSubmissionContextProto.parseFrom((InputStream)blobAppContextData.getBinaryStream()));
            }
            LOG.debug("Got the information about the specified application {}. The AM is running in {}", (Object)applicationId, (Object)homeRM);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the application information for the specified application %s.", applicationId);
        }
        catch (IOException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the application information for the specified application %s.", applicationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetApplicationHomeSubClusterResponse.newInstance(applicationId, homeRM, createTime, appSubmissionContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(GetApplicationsHomeSubClusterRequest request) throws YarnException {
        if (request == null) {
            throw new YarnException("Missing getApplicationsHomeSubCluster request");
        }
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<ApplicationHomeSubCluster>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
            cstmt.setInt("limit_IN", this.maxAppsInStateStore);
            String homeSubClusterIN = "";
            SubClusterId subClusterId = request.getSubClusterId();
            if (subClusterId != null) {
                homeSubClusterIN = subClusterId.toString();
            }
            cstmt.setString("homeSubCluster_IN", homeSubClusterIN);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next() && appsHomeSubClusters.size() <= this.maxAppsInStateStore) {
                String applicationId = rs.getString("applicationId");
                String homeSubCluster = rs.getString("homeSubCluster");
                appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(ApplicationId.fromString((String)applicationId), SubClusterId.newInstance(homeSubCluster)));
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the applications ", e);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(DeleteApplicationHomeSubClusterRequest request) throws YarnException {
        FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ApplicationId applicationId = request.getApplicationId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
            cstmt.setString("applicationId_IN", applicationId.toString());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Application %s does not exist.", applicationId);
            }
            if (cstmt.getInt("rowCount_OUT") != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deleting the application %s.", applicationId);
            }
            LOG.info("Delete from the StateStore the application: {}", (Object)request.getApplicationId());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to delete the application %s.", applicationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return DeleteApplicationHomeSubClusterResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(GetSubClusterPolicyConfigurationRequest request) throws YarnException {
        FederationPolicyStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_POLICY_CONFIGURATION);
            cstmt.setString("queue_IN", request.getQueue());
            cstmt.registerOutParameter("policyType_OUT", 12);
            cstmt.registerOutParameter("params_OUT", -3);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            String policyType = cstmt.getString("policyType_OUT");
            byte[] param = cstmt.getBytes("params_OUT");
            if (policyType == null || param == null) {
                LOG.warn("Policy for queue: {} does not exist.", (Object)request.getQueue());
                GetSubClusterPolicyConfigurationResponse getSubClusterPolicyConfigurationResponse = null;
                return getSubClusterPolicyConfigurationResponse;
            }
            subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance(request.getQueue(), policyType, ByteBuffer.wrap(param));
            LOG.debug("Selected from StateStore the policy for the queue: {}", (Object)subClusterPolicyConfiguration);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to select the policy for the queue : %s." + request.getQueue(), new Object[0]);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return GetSubClusterPolicyConfigurationResponse.newInstance(subClusterPolicyConfiguration);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest request) throws YarnException {
        FederationPolicyStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
        try {
            cstmt = this.getCallableStatement(CALL_SP_SET_POLICY_CONFIGURATION);
            cstmt.setString("queue_IN", policyConf.getQueue());
            cstmt.setString("policyType_IN", policyConf.getType());
            cstmt.setBytes("params_IN", SQLFederationStateStore.getByteArray(policyConf.getParams()));
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "The policy %s was not insert into the StateStore.", policyConf.getQueue());
            }
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during insert the policy %s.", policyConf.getQueue());
            }
            LOG.info("Insert into the state store the policy for the queue: {}.", (Object)policyConf.getQueue());
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly generated policy for the queue : %s.", policyConf.getQueue());
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return SetSubClusterPolicyConfigurationResponse.newInstance();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<SubClusterPolicyConfiguration> policyConfigurations = new ArrayList<SubClusterPolicyConfiguration>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_POLICIES_CONFIGURATIONS);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String queue = rs.getString("queue");
                String type = rs.getString("policyType");
                byte[] policyInfo = rs.getBytes("params");
                SubClusterPolicyConfiguration subClusterPolicyConfiguration = SubClusterPolicyConfiguration.newInstance(queue, type, ByteBuffer.wrap(policyInfo));
                policyConfigurations.add(subClusterPolicyConfiguration);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the policy information for all the queues.", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return GetSubClusterPoliciesConfigurationsResponse.newInstance(policyConfigurations);
    }

    @Override
    public Version getCurrentVersion() {
        return CURRENT_VERSION_INFO;
    }

    @Override
    public Version loadVersion() throws Exception {
        return this.getVersion();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Version getVersion() throws Exception {
        CallableStatement callableStatement = null;
        VersionPBImpl version = null;
        try {
            callableStatement = this.getCallableStatement(CALL_SP_LOAD_VERSION);
            callableStatement.registerOutParameter("fedVersion_OUT", -3);
            callableStatement.registerOutParameter("versionComment_OUT", 12);
            long startTime = this.clock.getTime();
            callableStatement.executeUpdate();
            long stopTime = this.clock.getTime();
            String versionComment = callableStatement.getString("versionComment_OUT");
            byte[] fedVersion = callableStatement.getBytes("fedVersion_OUT");
            if (versionComment != null && fedVersion != null) {
                version = new VersionPBImpl(YarnServerCommonProtos.VersionProto.parseFrom(fedVersion));
                FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            }
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to select the version.", new Object[0]);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, callableStatement);
        }
        return version;
    }

    @Override
    public void storeVersion() throws Exception {
        byte[] fedVersion = ((VersionPBImpl)CURRENT_VERSION_INFO).getProto().toByteArray();
        String versionComment = CURRENT_VERSION_INFO.toString();
        this.storeVersion(fedVersion, versionComment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void storeVersion(byte[] fedVersion, String versionComment) throws YarnException {
        CallableStatement callableStatement = null;
        try {
            callableStatement = this.getCallableStatement(CALL_SP_STORE_VERSION);
            callableStatement.setBytes("fedVersion_IN", fedVersion);
            callableStatement.setString("versionComment_IN", versionComment);
            callableStatement.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            callableStatement.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = callableStatement.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "The version %s was not insert into the StateStore.", versionComment);
            }
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during insert the version %s.", versionComment);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            LOG.info("Insert into the state store the version : {}.", (Object)versionComment);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly version : %s.", versionComment);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, callableStatement);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.dataSource != null) {
            this.dataSource.close();
            LOG.debug("Connection closed");
            FederationStateStoreClientMetrics.decrConnections();
        }
    }

    @VisibleForTesting
    protected Connection getConnection() throws SQLException {
        FederationStateStoreClientMetrics.incrConnections();
        return this.dataSource.getConnection();
    }

    protected Connection getConnection(boolean isCommitted) throws SQLException {
        Connection dbConn = this.getConnection();
        dbConn.setAutoCommit(isCommitted);
        return dbConn;
    }

    @VisibleForTesting
    protected CallableStatement getCallableStatement(String procedure) throws SQLException {
        return this.conn.prepareCall(procedure);
    }

    private static byte[] getByteArray(ByteBuffer bb) {
        byte[] ba = new byte[bb.limit()];
        bb.get(ba);
        return ba;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(AddReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
        SubClusterId subClusterHomeId = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            cstmt.registerOutParameter("storedHomeSubCluster_OUT", 12);
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            String subClusterHomeIdString = cstmt.getString("storedHomeSubCluster_OUT");
            subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (subClusterId.equals(subClusterHomeId)) {
                if (rowCount == 0) {
                    LOG.info("The reservation {} was not inserted in the StateStore because it was already present in subCluster {}", (Object)reservationId, (Object)subClusterHomeId);
                } else if (rowCount != 1) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the insertion of subCluster %s according to reservation %s. The database expects to insert 1 record, but the number of inserted changes is greater than 1, please check the records of the database.", subClusterId, reservationId);
                }
            } else {
                if (rowCount != 0) {
                    FederationStateStoreUtils.logAndThrowStoreException(LOG, "The reservation %s does exist but was overwritten.", reservationId);
                }
                LOG.info("Reservation: {} already present with subCluster: {}.", (Object)reservationId, (Object)subClusterHomeId);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly generated reservation %s to subCluster %s.", reservationId, subClusterId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(GetReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationId reservationId = request.getReservationId();
        SubClusterId subClusterId = null;
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.registerOutParameter("homeSubCluster_OUT", 12);
            long startTime = this.clock.getTime();
            cstmt.execute();
            long stopTime = this.clock.getTime();
            String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
            if (StringUtils.isNotBlank((CharSequence)subClusterHomeIdString)) {
                subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
            } else {
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Reservation %s does not exist", reservationId);
            }
            LOG.info("Got the information about the specified reservation {} in subCluster = {}.", (Object)reservationId, (Object)subClusterId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            ReservationHomeSubCluster homeSubCluster = ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
            GetReservationHomeSubClusterResponse getReservationHomeSubClusterResponse = GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
            return getReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the reservation information according to %s.", reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to obtain the reservation information according to " + reservationId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(GetReservationsHomeSubClusterRequest request) throws YarnException {
        GetReservationsHomeSubClusterResponse getReservationsHomeSubClusterResponse;
        CallableStatement cstmt = null;
        ResultSet rs = null;
        ArrayList<ReservationHomeSubCluster> reservationsHomeSubClusters = new ArrayList<ReservationHomeSubCluster>();
        try {
            cstmt = this.getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
            long startTime = this.clock.getTime();
            rs = cstmt.executeQuery();
            long stopTime = this.clock.getTime();
            while (rs.next()) {
                String dbReservationId = rs.getString("reservationId");
                String dbHomeSubCluster = rs.getString("homeSubCluster");
                ReservationId reservationId = ReservationId.parseReservationId((String)dbReservationId);
                SubClusterId homeSubCluster = SubClusterId.newInstance(dbHomeSubCluster);
                ReservationHomeSubCluster reservationHomeSubCluster = ReservationHomeSubCluster.newInstance(reservationId, homeSubCluster);
                reservationsHomeSubClusters.add(reservationHomeSubCluster);
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            getReservationsHomeSubClusterResponse = GetReservationsHomeSubClusterResponse.newInstance(reservationsHomeSubClusters);
        }
        catch (Exception e) {
            try {
                FederationStateStoreClientMetrics.failedStateStoreCall();
                FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the reservations.", e);
            }
            catch (Throwable throwable) {
                FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
                throw throwable;
            }
            FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        }
        FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
        return getReservationsHomeSubClusterResponse;
        throw new YarnException("Unable to obtain the information for all the reservations.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(DeleteReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationId reservationId = request.getReservationId();
        try {
            cstmt = this.getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Reservation %s does not exist", reservationId);
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deleting the reservation %s. The database is expected to delete 1 record, but the number of deleted records returned by the database is greater than 1, indicating that a duplicate reservationId occurred during the deletion process.", reservationId);
            }
            LOG.info("Delete from the StateStore the reservation: {}.", (Object)reservationId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            DeleteReservationHomeSubClusterResponse deleteReservationHomeSubClusterResponse = DeleteReservationHomeSubClusterResponse.newInstance();
            return deleteReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to delete the reservation %s.", reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to delete the reservation " + reservationId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(UpdateReservationHomeSubClusterRequest request) throws YarnException {
        FederationReservationHomeSubClusterStoreInputValidator.validate(request);
        CallableStatement cstmt = null;
        ReservationHomeSubCluster reservationHomeSubCluster = request.getReservationHomeSubCluster();
        ReservationId reservationId = reservationHomeSubCluster.getReservationId();
        SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
        try {
            cstmt = this.getCallableStatement(CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER);
            cstmt.setString("reservationId_IN", reservationId.toString());
            cstmt.setString("homeSubCluster_IN", subClusterId.getId());
            cstmt.registerOutParameter("rowCount_OUT", 4);
            long startTime = this.clock.getTime();
            cstmt.executeUpdate();
            long stopTime = this.clock.getTime();
            int rowCount = cstmt.getInt("rowCount_OUT");
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Reservation %s does not exist", reservationId);
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during update the subCluster %s according to reservation %s. The database is expected to update 1 record, but the number of database update records is greater than 1, the records of the database should be checked.", subClusterId, reservationId);
            }
            LOG.info("Update the subCluster to {} for reservation {} in the StateStore.", (Object)subClusterId, (Object)reservationId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            UpdateReservationHomeSubClusterResponse updateReservationHomeSubClusterResponse = UpdateReservationHomeSubClusterResponse.newInstance();
            return updateReservationHomeSubClusterResponse;
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to update the subCluster %s according to reservation %s.", subClusterId, reservationId);
        }
        finally {
            FederationStateStoreUtils.returnToPool(LOG, cstmt);
        }
        throw new YarnException("Unable to update the subCluster " + subClusterId + " according to reservation" + reservationId);
    }

    @VisibleForTesting
    public Connection getConn() {
        return this.conn;
    }

    @Override
    public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        DelegationKey delegationKey = FederationStateStoreUtils.convertMasterKeyToDelegationKey(request);
        int keyId = delegationKey.getKeyId();
        String delegationKeyStr = FederationStateStoreUtils.encodeWritable((Writable)delegationKey);
        try {
            FederationSQLOutParameter<Integer> rowCountOUT = new FederationSQLOutParameter<Integer>("rowCount_OUT", 4, Integer.class);
            long startTime = this.clock.getTime();
            Integer rowCount = this.getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, keyId, delegationKeyStr, rowCountOUT);
            long stopTime = this.clock.getTime();
            if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the insertion of masterKey, keyId = %s. please check the records of the database.", String.valueOf(keyId));
            }
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to insert the newly masterKey, keyId = %s.", String.valueOf(keyId));
        }
        return this.getMasterKeyByDelegationKey(request);
    }

    @Override
    public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        RouterMasterKey paramMasterKey = request.getRouterMasterKey();
        int paramKeyId = paramMasterKey.getKeyId();
        try {
            long startTime = this.clock.getTime();
            FederationSQLOutParameter<Integer> rowCountOUT = new FederationSQLOutParameter<Integer>("rowCount_OUT", 4, Integer.class);
            Integer rowCount = this.getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY, paramKeyId, rowCountOUT);
            long stopTime = this.clock.getTime();
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deleting the keyId %s. The database is expected to delete 1 record, but the number of deleted records returned by the database is greater than 1, indicating that a duplicate masterKey occurred during the deletion process.", paramKeyId);
            }
            LOG.info("Delete from the StateStore the keyId: {}.", (Object)paramKeyId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            return RouterMasterKeyResponse.newInstance(paramMasterKey);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to delete the keyId %s.", paramKeyId);
            throw new YarnException("Unable to delete the masterKey, keyId = " + paramKeyId);
        }
    }

    @Override
    public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        RouterMasterKey paramMasterKey = request.getRouterMasterKey();
        int paramKeyId = paramMasterKey.getKeyId();
        try {
            FederationQueryRunner runner = new FederationQueryRunner();
            FederationSQLOutParameter<String> masterKeyOUT = new FederationSQLOutParameter<String>("masterKey_OUT", 12, String.class);
            long startTime = this.clock.getTime();
            RouterMasterKey routerMasterKey = runner.execute(this.conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), paramKeyId, masterKeyOUT);
            long stopTime = this.clock.getTime();
            LOG.info("Got the information about the specified masterKey = {} according to keyId = {}.", (Object)routerMasterKey, (Object)paramKeyId);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            return RouterMasterKeyResponse.newInstance(routerMasterKey);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to obtain the masterKey information according to %s.", String.valueOf(paramKeyId));
            throw new YarnException("Unable to obtain the masterKey information according to " + paramKeyId);
        }
    }

    @Override
    public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        try {
            long duration = this.addOrUpdateToken(request, true);
            FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            throw new YarnException((Throwable)e);
        }
        return this.getTokenByRouterStoreToken(request);
    }

    @Override
    public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        try {
            long duration = this.addOrUpdateToken(request, false);
            FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            throw new YarnException((Throwable)e);
        }
        return this.getTokenByRouterStoreToken(request);
    }

    private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd) throws IOException, SQLException, YarnException {
        RouterStoreToken routerStoreToken = request.getRouterStoreToken();
        YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
        String tokenIdentifier = FederationStateStoreUtils.encodeWritable((Writable)identifier);
        String tokenInfo = routerStoreToken.getTokenInfo();
        long renewDate = routerStoreToken.getRenewDate();
        int sequenceNum = identifier.getSequenceNumber();
        FederationQueryRunner runner = new FederationQueryRunner();
        FederationSQLOutParameter<Integer> rowCountOUT = new FederationSQLOutParameter<Integer>("rowCount_OUT", 4, Integer.class);
        long startTime = this.clock.getTime();
        String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN : CALL_SP_UPDATE_DELEGATIONTOKEN;
        Integer rowCount = runner.execute(this.conn, procedure, new RowCountHandler("rowCount_OUT"), sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
        long stopTime = this.clock.getTime();
        if (rowCount != 1) {
            FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during the insertion of delegationToken, tokenId = %s. Please check the records of the database.", String.valueOf(sequenceNum));
        }
        return stopTime - startTime;
    }

    @Override
    public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        RouterStoreToken routerStoreToken = request.getRouterStoreToken();
        YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
        int sequenceNum = identifier.getSequenceNumber();
        try {
            FederationSQLOutParameter<Integer> rowCountOUT = new FederationSQLOutParameter<Integer>("rowCount_OUT", 4, Integer.class);
            long startTime = this.clock.getTime();
            Integer rowCount = this.getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN, sequenceNum, rowCountOUT);
            long stopTime = this.clock.getTime();
            if (rowCount == 0) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "TokenId %s does not exist", String.valueOf(sequenceNum));
            } else if (rowCount != 1) {
                FederationStateStoreUtils.logAndThrowStoreException(LOG, "Wrong behavior during deleting the delegationToken %s. The database is expected to delete 1 record, but the number of deleted records returned by the database is greater than 1, indicating that a duplicate tokenId occurred during the deletion process.", String.valueOf(sequenceNum));
            }
            LOG.info("Delete from the StateStore the delegationToken, tokenId = {}.", (Object)sequenceNum);
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            return RouterRMTokenResponse.newInstance(routerStoreToken);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
            throw new YarnException("Unable to delete the delegationToken, tokenId = " + sequenceNum);
        }
    }

    @Override
    public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException {
        FederationRouterRMTokenInputValidator.validate(request);
        RouterStoreToken routerStoreToken = request.getRouterStoreToken();
        YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
        int sequenceNum = identifier.getSequenceNumber();
        try {
            FederationQueryRunner runner = new FederationQueryRunner();
            FederationSQLOutParameter<String> tokenIdentOUT = new FederationSQLOutParameter<String>("tokenIdent_OUT", 12, String.class);
            FederationSQLOutParameter<String> tokenOUT = new FederationSQLOutParameter<String>("token_OUT", 12, String.class);
            FederationSQLOutParameter<Long> renewDateOUT = new FederationSQLOutParameter<Long>("renewDate_OUT", -5, Long.class);
            long startTime = this.clock.getTime();
            RouterStoreToken resultToken = runner.execute(this.conn, CALL_SP_GET_DELEGATIONTOKEN, new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, renewDateOUT);
            long stopTime = this.clock.getTime();
            FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
            return RouterRMTokenResponse.newInstance(resultToken);
        }
        catch (SQLException e) {
            FederationStateStoreClientMetrics.failedStateStoreCall();
            FederationStateStoreUtils.logAndThrowRetriableException(e, LOG, "Unable to get the delegationToken, tokenId = %s.", String.valueOf(sequenceNum));
            throw new YarnException("Unable to get the delegationToken, tokenId = " + sequenceNum);
        }
    }

    private int getRowCountByProcedureSQL(String procedure, Object ... params) throws SQLException {
        FederationQueryRunner runner = new FederationQueryRunner();
        Integer rowCount = runner.execute(this.conn, procedure, new RowCountHandler("rowCount_OUT"), params);
        return rowCount;
    }

    @Override
    public int incrementDelegationTokenSeqNum() {
        return this.querySequenceTable("YARN_ROUTER_SEQUENCE_NUM", true);
    }

    @Override
    public int getDelegationTokenSeqNum() {
        return this.querySequenceTable("YARN_ROUTER_SEQUENCE_NUM", false);
    }

    @Override
    public void setDelegationTokenSeqNum(int seqNum) {
        Connection connection = null;
        try {
            connection = this.getConnection(false);
            FederationQueryRunner runner = new FederationQueryRunner();
            runner.updateSequenceTable(connection, "YARN_ROUTER_SEQUENCE_NUM", seqNum);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not update sequence table!!", e);
        }
        finally {
            try {
                FederationStateStoreUtils.returnToPool(LOG, null, connection);
            }
            catch (YarnException e) {
                LOG.error("close connection error.", (Throwable)e);
            }
        }
    }

    @Override
    public int getCurrentKeyId() {
        return this.querySequenceTable("YARN_ROUTER_CURRENT_KEY_ID", false);
    }

    @Override
    public int incrementCurrentKeyId() {
        return this.querySequenceTable("YARN_ROUTER_CURRENT_KEY_ID", true);
    }

    private int querySequenceTable(String sequenceName, boolean isUpdate) {
        Connection connection = null;
        try {
            connection = this.getConnection(false);
            FederationQueryRunner runner = new FederationQueryRunner();
            int n = runner.selectOrUpdateSequenceTable(connection, sequenceName, isUpdate);
            return n;
        }
        catch (Exception e) {
            throw new RuntimeException("Could not query sequence table!!", e);
        }
        finally {
            try {
                FederationStateStoreUtils.returnToPool(LOG, null, connection);
            }
            catch (YarnException e) {
                LOG.error("close connection error.", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    public HikariDataSource getDataSource() {
        return this.dataSource;
    }
}

