package org.apache.kafka.controller;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.DelegationTokenRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterControllerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.AclControlManager;
import org.apache.kafka.controller.ClientQuotaControlManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.DelegationTokenControlManager;
import org.apache.kafka.controller.EventPerformanceMonitor;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.LogReplayTracker;
import org.apache.kafka.controller.OffsetControlManager;
import org.apache.kafka.controller.PeriodicTaskControlManager;
import org.apache.kafka.controller.ProducerIdControlManager;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ScramControlManager;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/controller/QuorumController.class */
public final class QuorumController implements Controller {
    private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;
    private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;
    static final int MAX_RECORDS_PER_USER_OP = 10000;
    private final FaultHandler nonFatalFaultHandler;
    private final FaultHandler fatalFaultHandler;
    private final Logger log;
    private final int nodeId;
    private final String clusterId;
    private final KafkaEventQueue queue;
    private final Time time;
    private final QuorumControllerMetrics controllerMetrics;
    private final SnapshotRegistry snapshotRegistry;
    private final DeferredEventQueue deferredEventQueue;
    private final OffsetControlManager offsetControl;
    private final ConfigurationControlManager configurationControl;
    private final ClientQuotaControlManager clientQuotaControlManager;
    private final PeriodicTaskControlManager periodicControl;
    private final ClusterControlManager clusterControl;
    private final FeatureControlManager featureControl;
    private final ProducerIdControlManager producerIdControlManager;
    private final ReplicationControlManager replicationControl;
    private final ScramControlManager scramControlManager;
    private final DelegationTokenControlManager delegationTokenControlManager;
    private final AclControlManager aclControlManager;
    private final LogReplayTracker logReplayTracker;
    private final RaftClient<ApiMessageAndVersion> raftClient;
    private final BootstrapMetadata bootstrapMetadata;
    private final int maxRecordsPerBatch;
    private final RecordRedactor recordRedactor;
    private final EventPerformanceMonitor performanceMonitor;
    private final Consumer<ConfigResource> resourceExists = new ConfigResourceExistenceChecker();
    private final QuorumClusterFeatureSupportDescriber clusterSupportDescriber = new QuorumClusterFeatureSupportDescriber();
    private final PeriodicTaskControlManagerQueueAccessor queueAccessor = new PeriodicTaskControlManagerQueueAccessor();
    private final QuorumMetaLogListener metaLogListener = new QuorumMetaLogListener();
    private volatile int curClaimEpoch = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.controller.QuorumController$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$config$ConfigResource$Type;

        static {
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REGISTER_BROKER_RECORD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.UNREGISTER_BROKER_RECORD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.TOPIC_RECORD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.PARTITION_RECORD.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.CONFIG_RECORD.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.PARTITION_CHANGE_RECORD.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.FENCE_BROKER_RECORD.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.UNFENCE_BROKER_RECORD.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REMOVE_TOPIC_RECORD.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.FEATURE_LEVEL_RECORD.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.CLIENT_QUOTA_RECORD.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.PRODUCER_IDS_RECORD.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.BROKER_REGISTRATION_CHANGE_RECORD.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.ACCESS_CONTROL_ENTRY_RECORD.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REMOVE_ACCESS_CONTROL_ENTRY_RECORD.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.USER_SCRAM_CREDENTIAL_RECORD.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REMOVE_USER_SCRAM_CREDENTIAL_RECORD.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.DELEGATION_TOKEN_RECORD.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REMOVE_DELEGATION_TOKEN_RECORD.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.NO_OP_RECORD.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.ZK_MIGRATION_STATE_RECORD.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.BEGIN_TRANSACTION_RECORD.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.END_TRANSACTION_RECORD.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.ABORT_TRANSACTION_RECORD.ordinal()] = 24;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.REGISTER_CONTROLLER_RECORD.ordinal()] = 25;
            } catch (NoSuchFieldError e25) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$metadata$MetadataRecordType[MetadataRecordType.CLEAR_ELR_RECORD.ordinal()] = 26;
            } catch (NoSuchFieldError e26) {
            }
            $SwitchMap$org$apache$kafka$common$config$ConfigResource$Type = new int[ConfigResource.Type.values().length];
            try {
                $SwitchMap$org$apache$kafka$common$config$ConfigResource$Type[ConfigResource.Type.BROKER_LOGGER.ordinal()] = 1;
            } catch (NoSuchFieldError e27) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$config$ConfigResource$Type[ConfigResource.Type.BROKER.ordinal()] = 2;
            } catch (NoSuchFieldError e28) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$config$ConfigResource$Type[ConfigResource.Type.TOPIC.ordinal()] = 3;
            } catch (NoSuchFieldError e29) {
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$Builder.class */
    public static class Builder {
        private final int nodeId;
        private final String clusterId;
        private DelegationTokenCache tokenCache;
        private String tokenSecretKeyString;
        private long delegationTokenMaxLifeMs;
        private long delegationTokenExpiryTimeMs;
        private FaultHandler nonFatalFaultHandler = null;
        private FaultHandler fatalFaultHandler = null;
        private Time time = Time.SYSTEM;
        private String threadNamePrefix = null;
        private LogContext logContext = null;
        private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
        private RaftClient<ApiMessageAndVersion> raftClient = null;
        private QuorumFeatures quorumFeatures = null;
        private short defaultReplicationFactor = 3;
        private int defaultNumPartitions = 1;
        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
        private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
        private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
        private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
        private OptionalLong fenceStaleBrokerIntervalNs = OptionalLong.empty();
        private QuorumControllerMetrics controllerMetrics = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
        private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
        private Map<String, Object> staticConfig = Collections.emptyMap();
        private BootstrapMetadata bootstrapMetadata = null;
        private int maxRecordsPerBatch = 10000;
        private long controllerPerformanceSamplePeriodMs = 60000;
        private long controllerPerformanceAlwaysLogThresholdMs = 2000;
        private long delegationTokenExpiryCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
        private long uncleanLeaderElectionCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
        private String interBrokerListenerName = "PLAINTEXT";

        public Builder(int i, String str) {
            this.nodeId = i;
            this.clusterId = str;
        }

        public Builder setNonFatalFaultHandler(FaultHandler faultHandler) {
            this.nonFatalFaultHandler = faultHandler;
            return this;
        }

        public Builder setFatalFaultHandler(FaultHandler faultHandler) {
            this.fatalFaultHandler = faultHandler;
            return this;
        }

        public int nodeId() {
            return this.nodeId;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String str) {
            this.threadNamePrefix = str;
            return this;
        }

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder setConfigSchema(KafkaConfigSchema kafkaConfigSchema) {
            this.configSchema = kafkaConfigSchema;
            return this;
        }

        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> raftClient) {
            this.raftClient = raftClient;
            return this;
        }

        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
            this.quorumFeatures = quorumFeatures;
            return this;
        }

        public Builder setDefaultReplicationFactor(short s) {
            this.defaultReplicationFactor = s;
            return this;
        }

        public Builder setDefaultNumPartitions(int i) {
            this.defaultNumPartitions = i;
            return this;
        }

        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong optionalLong) {
            this.leaderImbalanceCheckIntervalNs = optionalLong;
            return this;
        }

        public Builder setMaxIdleIntervalNs(OptionalLong optionalLong) {
            this.maxIdleIntervalNs = optionalLong;
            return this;
        }

        public Builder setSessionTimeoutNs(long j) {
            this.sessionTimeoutNs = j;
            return this;
        }

        public Builder setFenceStaleBrokerIntervalNs(long j) {
            this.fenceStaleBrokerIntervalNs = OptionalLong.of(j);
            return this;
        }

        public Builder setMetrics(QuorumControllerMetrics quorumControllerMetrics) {
            this.controllerMetrics = quorumControllerMetrics;
            return this;
        }

        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
            this.bootstrapMetadata = bootstrapMetadata;
            return this;
        }

        public Builder setMaxRecordsPerBatch(int i) {
            this.maxRecordsPerBatch = i;
            return this;
        }

        public Builder setControllerPerformanceSamplePeriodMs(long j) {
            this.controllerPerformanceSamplePeriodMs = j;
            return this;
        }

        public Builder setControllerPerformanceAlwaysLogThresholdMs(long j) {
            this.controllerPerformanceAlwaysLogThresholdMs = j;
            return this;
        }

        public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> optional) {
            this.createTopicPolicy = optional;
            return this;
        }

        public Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> optional) {
            this.alterConfigPolicy = optional;
            return this;
        }

        public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
            this.configurationValidator = configurationValidator;
            return this;
        }

        public Builder setStaticConfig(Map<String, Object> map) {
            this.staticConfig = map;
            return this;
        }

        public Builder setDelegationTokenCache(DelegationTokenCache delegationTokenCache) {
            this.tokenCache = delegationTokenCache;
            return this;
        }

        public Builder setDelegationTokenSecretKey(String str) {
            this.tokenSecretKeyString = str;
            return this;
        }

        public Builder setDelegationTokenMaxLifeMs(long j) {
            this.delegationTokenMaxLifeMs = j;
            return this;
        }

        public Builder setDelegationTokenExpiryTimeMs(long j) {
            this.delegationTokenExpiryTimeMs = j;
            return this;
        }

        public Builder setDelegationTokenExpiryCheckIntervalMs(long j) {
            this.delegationTokenExpiryCheckIntervalMs = j;
            return this;
        }

        public Builder setUncleanLeaderElectionCheckIntervalMs(long j) {
            this.uncleanLeaderElectionCheckIntervalMs = j;
            return this;
        }

        public Builder setInterBrokerListenerName(String str) {
            this.interBrokerListenerName = str;
            return this;
        }

        public QuorumController build() throws Exception {
            if (this.raftClient == null) {
                throw new IllegalStateException("You must set a raft client.");
            }
            if (this.bootstrapMetadata == null) {
                throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
            }
            if (this.quorumFeatures == null) {
                throw new IllegalStateException("You must specify the quorum features");
            }
            if (this.nonFatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a non-fatal fault handler.");
            }
            if (this.fatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a fatal fault handler.");
            }
            if (this.threadNamePrefix == null) {
                this.threadNamePrefix = String.format("quorum-controller-%d-", Integer.valueOf(this.nodeId));
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(String.format("[QuorumController id=%d] ", Integer.valueOf(this.nodeId)));
            }
            if (this.controllerMetrics == null) {
                this.controllerMetrics = new QuorumControllerMetrics(Optional.empty(), this.time);
            }
            KafkaEventQueue kafkaEventQueue = null;
            try {
                kafkaEventQueue = new KafkaEventQueue(this.time, this.logContext, this.threadNamePrefix);
                return new QuorumController(this.nonFatalFaultHandler, this.fatalFaultHandler, this.logContext, this.nodeId, this.clusterId, kafkaEventQueue, this.time, this.configSchema, this.raftClient, this.quorumFeatures, this.defaultReplicationFactor, this.defaultNumPartitions, this.replicaPlacer, this.leaderImbalanceCheckIntervalNs, this.maxIdleIntervalNs, this.sessionTimeoutNs, this.fenceStaleBrokerIntervalNs, this.controllerMetrics, this.createTopicPolicy, this.alterConfigPolicy, this.configurationValidator, this.staticConfig, this.bootstrapMetadata, this.maxRecordsPerBatch, this.tokenCache, this.tokenSecretKeyString, this.delegationTokenMaxLifeMs, this.delegationTokenExpiryTimeMs, this.delegationTokenExpiryCheckIntervalMs, this.uncleanLeaderElectionCheckIntervalMs, this.interBrokerListenerName, this.controllerPerformanceSamplePeriodMs, this.controllerPerformanceAlwaysLogThresholdMs);
            } catch (Exception e) {
                Utils.closeQuietly(kafkaEventQueue, "event queue");
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$CompleteActivationEvent.class */
    public class CompleteActivationEvent implements ControllerWriteOperation<Void> {
        CompleteActivationEvent() {
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public ControllerResult<Void> generateRecordsAndResult() {
            try {
                Logger logger = QuorumController.this.log;
                Objects.requireNonNull(logger);
                return ActivationRecordsGenerator.generate(logger::warn, QuorumController.this.logReplayTracker.empty(), QuorumController.this.offsetControl.transactionStartOffset(), QuorumController.this.bootstrapMetadata, QuorumController.this.featureControl.metadataVersion(), QuorumController.this.configurationControl.getStaticallyConfiguredMinInsyncReplicas());
            } catch (Throwable th) {
                throw QuorumController.this.fatalFaultHandler.handleFault("exception while completing controller activation", th);
            }
        }

        @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
        public void processBatchEndOffset(long j) {
            QuorumController.this.periodicControl.activate();
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ConfigResourceExistenceChecker.class */
    class ConfigResourceExistenceChecker implements Consumer<ConfigResource> {
        ConfigResourceExistenceChecker() {
        }

        @Override // java.util.function.Consumer
        public void accept(ConfigResource configResource) {
            switch (AnonymousClass2.$SwitchMap$org$apache$kafka$common$config$ConfigResource$Type[configResource.type().ordinal()]) {
                case 1:
                default:
                    return;
                case 2:
                    if (configResource.name().isEmpty()) {
                        return;
                    }
                    try {
                        int parseInt = Integer.parseInt(configResource.name());
                        if (!QuorumController.this.clusterControl.brokerRegistrations().containsKey(Integer.valueOf(parseInt)) && !QuorumController.this.featureControl.isControllerId(parseInt)) {
                            throw new BrokerIdNotRegisteredException("No node with id " + parseInt + " found.");
                        }
                        return;
                    } catch (NumberFormatException e) {
                        throw new InvalidRequestException("Invalid broker name " + configResource.name());
                    }
                case 3:
                    if (QuorumController.this.replicationControl.getTopicId(configResource.name()) == null) {
                        throw new UnknownTopicOrPartitionException("The topic '" + configResource.name() + "' does not exist.");
                    }
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerEvent.class */
    public class ControllerEvent implements EventQueue.Event {
        private final String name;
        private final Runnable handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();

        ControllerEvent(String str, Runnable runnable) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.handler = runnable;
        }

        public void run() throws Exception {
            this.startProcessingTimeNs = OptionalLong.of(QuorumController.this.updateEventStartMetricsAndGetTime(OptionalLong.of(this.eventCreatedTimeNs)));
            QuorumController.this.log.debug("Executing {}.", this);
            this.handler.run();
            QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
        }

        public void handleException(Throwable th) {
            QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th);
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerOperationFlag.class */
    public enum ControllerOperationFlag {
        DOES_NOT_UPDATE_QUEUE_TIME
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerReadEvent.class */
    public class ControllerReadEvent<T> implements EventQueue.Event {
        private final String name;
        private final Supplier<T> handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
        private final CompletableFuture<T> future = new CompletableFuture<>();

        ControllerReadEvent(String str, Supplier<T> supplier) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.handler = supplier;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            this.startProcessingTimeNs = OptionalLong.of(QuorumController.this.updateEventStartMetricsAndGetTime(OptionalLong.of(this.eventCreatedTimeNs)));
            T t = this.handler.get();
            QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
            this.future.complete(t);
        }

        public void handleException(Throwable th) {
            this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th));
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerWriteEvent.class */
    public class ControllerWriteEvent<T> implements EventQueue.Event, DeferredEvent {
        private final String name;
        private final ControllerWriteOperation<T> op;
        private final long eventCreatedTimeNs;
        private final EnumSet<ControllerOperationFlag> flags;
        private OptionalLong startProcessingTimeNs = OptionalLong.empty();
        private final CompletableFuture<T> future = new CompletableFuture<>();
        private ControllerResultAndOffset<T> resultAndOffset = null;

        ControllerWriteEvent(String str, ControllerWriteOperation<T> controllerWriteOperation, EnumSet<ControllerOperationFlag> enumSet) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.name = str;
            this.op = controllerWriteOperation;
            this.flags = enumSet;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            this.startProcessingTimeNs = OptionalLong.of(QuorumController.this.updateEventStartMetricsAndGetTime(this.flags.contains(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME) ? OptionalLong.empty() : OptionalLong.of(this.eventCreatedTimeNs)));
            int i = QuorumController.this.curClaimEpoch;
            if (!QuorumController.isActiveController(i)) {
                throw ControllerExceptions.newWrongControllerException(QuorumController.this.latestController());
            }
            ControllerResult<T> generateRecordsAndResult = this.op.generateRecordsAndResult();
            if (generateRecordsAndResult.records().isEmpty()) {
                this.op.processBatchEndOffset(QuorumController.this.offsetControl.nextWriteOffset() - 1);
                OptionalLong highestPendingOffset = QuorumController.this.deferredEventQueue.highestPendingOffset();
                if (highestPendingOffset.isEmpty()) {
                    this.resultAndOffset = ControllerResultAndOffset.of(-1L, generateRecordsAndResult);
                    QuorumController.this.log.debug("Completing read-only operation {} immediately because the purgatory is empty.", this);
                    complete(null);
                } else {
                    this.resultAndOffset = ControllerResultAndOffset.of(highestPendingOffset.getAsLong(), generateRecordsAndResult);
                    QuorumController.this.log.debug("Read-only operation {} will be completed when the log reaches offset {}", this, Long.valueOf(this.resultAndOffset.offset()));
                }
            } else {
                long appendRecords = QuorumController.appendRecords(QuorumController.this.log, generateRecordsAndResult, QuorumController.this.maxRecordsPerBatch, list -> {
                    int i2 = 0;
                    long prepareAppend = QuorumController.this.raftClient.prepareAppend(i, list);
                    long size = (prepareAppend - list.size()) + 1;
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        ApiMessageAndVersion apiMessageAndVersion = (ApiMessageAndVersion) it.next();
                        long j = size + i2;
                        try {
                            QuorumController.this.replay(apiMessageAndVersion.message(), Optional.empty(), j);
                            i2++;
                        } catch (Throwable th) {
                            throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Unable to apply %s record at offset %d on active controller, from the batch with baseOffset %d", apiMessageAndVersion.message().getClass().getSimpleName(), Long.valueOf(j), Long.valueOf(size)), th);
                        }
                    }
                    QuorumController.this.raftClient.schedulePreparedAppend();
                    QuorumController.this.offsetControl.handleScheduleAppend(prepareAppend);
                    return Long.valueOf(prepareAppend);
                });
                this.op.processBatchEndOffset(appendRecords);
                this.resultAndOffset = ControllerResultAndOffset.of(appendRecords, generateRecordsAndResult);
                QuorumController.this.log.debug("Read-write operation {} will be completed when the log reaches offset {}.", this, Long.valueOf(this.resultAndOffset.offset()));
            }
            if (this.future.isDone()) {
                return;
            }
            QuorumController.this.deferredEventQueue.add(this.resultAndOffset.offset(), this);
        }

        public void handleException(Throwable th) {
            complete(th);
        }

        public void complete(Throwable th) {
            if (th != null) {
                this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, th));
            } else {
                QuorumController.this.handleEventEnd(toString(), this.startProcessingTimeNs.getAsLong());
                this.future.complete(this.resultAndOffset.response());
            }
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$ControllerWriteOperation.class */
    public interface ControllerWriteOperation<T> {
        ControllerResult<T> generateRecordsAndResult() throws Exception;

        default void processBatchEndOffset(long j) {
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$PeriodicTaskControlManagerQueueAccessor.class */
    class PeriodicTaskControlManagerQueueAccessor implements PeriodicTaskControlManager.QueueAccessor {
        PeriodicTaskControlManagerQueueAccessor() {
        }

        @Override // org.apache.kafka.controller.PeriodicTaskControlManager.QueueAccessor
        public void scheduleDeferred(String str, long j, Supplier<ControllerResult<Void>> supplier) {
            EnumSet of = EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME);
            KafkaEventQueue kafkaEventQueue = QuorumController.this.queue;
            EventQueue.EarliestDeadlineFunction earliestDeadlineFunction = new EventQueue.EarliestDeadlineFunction(j);
            QuorumController quorumController = QuorumController.this;
            Objects.requireNonNull(supplier);
            kafkaEventQueue.scheduleDeferred(str, earliestDeadlineFunction, new ControllerWriteEvent(str, supplier::get, of));
        }

        @Override // org.apache.kafka.controller.PeriodicTaskControlManager.QueueAccessor
        public void cancelDeferred(String str) {
            QuorumController.this.queue.cancelDeferred(str);
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$QuorumClusterFeatureSupportDescriber.class */
    class QuorumClusterFeatureSupportDescriber implements ClusterFeatureSupportDescriber {
        QuorumClusterFeatureSupportDescriber() {
        }

        @Override // org.apache.kafka.controller.ClusterFeatureSupportDescriber
        public Iterator<Map.Entry<Integer, Map<String, VersionRange>>> brokerSupported() {
            return QuorumController.this.clusterControl.brokerSupportedFeatures();
        }

        @Override // org.apache.kafka.controller.ClusterFeatureSupportDescriber
        public Iterator<Map.Entry<Integer, Map<String, VersionRange>>> controllerSupported() {
            return QuorumController.this.clusterControl.controllerSupportedFeatures();
        }
    }

    /* loaded from: input_file:org/apache/kafka/controller/QuorumController$QuorumMetaLogListener.class */
    class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> {
        QuorumMetaLogListener() {
        }

        public void handleCommit(BatchReader<ApiMessageAndVersion> batchReader) {
            appendRaftEvent("handleCommit[baseOffset=" + batchReader.baseOffset() + "]", () -> {
                RuntimeException handleFault;
                try {
                    boolean isActiveController = QuorumController.this.isActiveController();
                    while (batchReader.hasNext()) {
                        Batch<ApiMessageAndVersion> batch = (Batch) batchReader.next();
                        long lastOffset = batch.lastOffset();
                        int epoch = batch.epoch();
                        List records = batch.records();
                        if (records.isEmpty()) {
                            QuorumController.this.log.debug("Skipping handling commit for batch with no data records with offset {} and epoch {}.", Long.valueOf(lastOffset), Integer.valueOf(epoch));
                            QuorumController.this.offsetControl.handleCommitBatchMetrics(batch);
                        } else if (isActiveController) {
                            QuorumController.this.log.debug("Completing purgatory items up to offset {} and epoch {}.", Long.valueOf(lastOffset), Integer.valueOf(epoch));
                            QuorumController.this.offsetControl.handleCommitBatch(batch);
                            QuorumController.this.deferredEventQueue.completeUpTo(QuorumController.this.offsetControl.lastStableOffset());
                        } else {
                            if (QuorumController.this.log.isDebugEnabled()) {
                                QuorumController.this.log.debug("Replaying commits from the active node up to offset {} and epoch {}.", Long.valueOf(lastOffset), Integer.valueOf(epoch));
                            }
                            int i = 0;
                            Iterator it = records.iterator();
                            while (it.hasNext()) {
                                try {
                                    QuorumController.this.replay(((ApiMessageAndVersion) it.next()).message(), Optional.empty(), batch.baseOffset() + i);
                                    i++;
                                } finally {
                                }
                            }
                            QuorumController.this.offsetControl.handleCommitBatch(batch);
                        }
                    }
                } finally {
                    batchReader.close();
                }
            });
        }

        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> snapshotReader) {
            appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", snapshotReader.snapshotId()), () -> {
                try {
                    try {
                        String filenameFromSnapshotId = Snapshots.filenameFromSnapshotId(snapshotReader.snapshotId());
                        if (QuorumController.this.isActiveController()) {
                            throw QuorumController.this.fatalFaultHandler.handleFault("Asked to load snapshot " + filenameFromSnapshotId + ", but we are the active controller at epoch " + QuorumController.this.curClaimEpoch);
                        }
                        QuorumController.this.offsetControl.beginLoadSnapshot(snapshotReader.snapshotId());
                        while (snapshotReader.hasNext()) {
                            Batch batch = (Batch) snapshotReader.next();
                            long lastOffset = batch.lastOffset();
                            List<ApiMessageAndVersion> records = batch.records();
                            QuorumController.this.log.debug("Replaying snapshot {} batch with last offset of {}", filenameFromSnapshotId, Long.valueOf(lastOffset));
                            int i = 1;
                            for (ApiMessageAndVersion apiMessageAndVersion : records) {
                                try {
                                    QuorumController.this.replay(apiMessageAndVersion.message(), Optional.of(snapshotReader.snapshotId()), snapshotReader.lastContainedLogOffset());
                                    i++;
                                } catch (Throwable th) {
                                    throw QuorumController.this.fatalFaultHandler.handleFault(String.format("Unable to apply %s record from snapshot %s on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", apiMessageAndVersion.message().getClass().getSimpleName(), snapshotReader.snapshotId(), Integer.valueOf(i), Integer.valueOf(records.size()), Long.valueOf(batch.baseOffset())), th);
                                }
                            }
                        }
                        QuorumController.this.offsetControl.endLoadSnapshot(snapshotReader.lastContainedLogTimestamp());
                        snapshotReader.close();
                    } catch (FaultHandlerException e) {
                        throw e;
                    } catch (Throwable th2) {
                        throw QuorumController.this.fatalFaultHandler.handleFault("Error while loading snapshot " + String.valueOf(snapshotReader.snapshotId()), th2);
                    }
                } catch (Throwable th3) {
                    snapshotReader.close();
                    throw th3;
                }
            });
        }

        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
            appendRaftEvent("handleLeaderChange[" + leaderAndEpoch.epoch() + "]", () -> {
                String valueOf = leaderAndEpoch.leaderId().isPresent() ? String.valueOf(leaderAndEpoch.leaderId().getAsInt()) : "(none)";
                if (leaderAndEpoch.leaderId().isPresent()) {
                    QuorumController.this.controllerMetrics.incrementNewActiveControllers();
                }
                if (QuorumController.this.isActiveController()) {
                    if (leaderAndEpoch.isLeader(QuorumController.this.nodeId)) {
                        QuorumController.this.log.warn("We were the leader in epoch {}, and are still the leader in the new epoch {}.", Integer.valueOf(QuorumController.this.curClaimEpoch), Integer.valueOf(leaderAndEpoch.epoch()));
                        QuorumController.this.curClaimEpoch = leaderAndEpoch.epoch();
                        return;
                    } else {
                        QuorumController.this.log.warn("Renouncing the leadership due to a metadata log event. We were the leader at epoch {}, but in the new epoch {}, the leader is {}. Reverting to last stable offset {}.", new Object[]{Integer.valueOf(QuorumController.this.curClaimEpoch), Integer.valueOf(leaderAndEpoch.epoch()), valueOf, Long.valueOf(QuorumController.this.offsetControl.lastStableOffset())});
                        QuorumController.this.renounce();
                        return;
                    }
                }
                if (!leaderAndEpoch.isLeader(QuorumController.this.nodeId)) {
                    QuorumController.this.log.info("In the new epoch {}, the leader is {}.", Integer.valueOf(leaderAndEpoch.epoch()), valueOf);
                    return;
                }
                long logEndOffset = QuorumController.this.raftClient.logEndOffset();
                QuorumController.this.log.info("Becoming the active controller at epoch {}, next write offset {}.", Integer.valueOf(leaderAndEpoch.epoch()), Long.valueOf(logEndOffset));
                QuorumController.this.claim(leaderAndEpoch.epoch(), logEndOffset);
            });
        }

        public void beginShutdown() {
            QuorumController.this.queue.beginShutdown("MetaLogManager.Listener");
        }

        private void appendRaftEvent(String str, Runnable runnable) {
            QuorumController.this.appendControlEvent(str, () -> {
                if (this != QuorumController.this.metaLogListener) {
                    QuorumController.this.log.debug("Ignoring {} raft event from an old registration", str);
                } else {
                    runnable.run();
                }
            });
        }
    }

    private OptionalInt latestController() {
        return this.raftClient.leaderAndEpoch().leaderId();
    }

    private void handleEventEnd(String str, long j) {
        long nanoseconds = this.time.nanoseconds() - j;
        this.log.debug("Processed {} in {} us", str, Long.valueOf(TimeUnit.MICROSECONDS.convert(nanoseconds, TimeUnit.NANOSECONDS)));
        this.performanceMonitor.observeEvent(str, nanoseconds);
        this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds));
    }

    private Throwable handleEventException(String str, OptionalLong optionalLong, Throwable th) {
        OptionalLong empty;
        if (optionalLong.isPresent()) {
            long nanoseconds = this.time.nanoseconds() - optionalLong.getAsLong();
            this.performanceMonitor.observeEvent(str, nanoseconds);
            this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds));
            empty = OptionalLong.of(TimeUnit.MICROSECONDS.convert(nanoseconds, TimeUnit.NANOSECONDS));
        } else {
            empty = OptionalLong.empty();
        }
        EventHandlerExceptionInfo fromInternal = EventHandlerExceptionInfo.fromInternal(th, this::latestController);
        int i = this.curClaimEpoch;
        if (i == -1) {
            i = this.offsetControl.lastCommittedEpoch();
        }
        String failureMessage = fromInternal.failureMessage(i, empty, isActiveController(), this.offsetControl.lastCommittedOffset());
        if (fromInternal.isTimeoutException() && empty.isEmpty()) {
            this.controllerMetrics.incrementOperationsTimedOut();
        }
        if (fromInternal.isFault()) {
            this.nonFatalFaultHandler.handleFault(str + ": " + failureMessage, th);
        } else {
            this.log.info("{}: {}", str, failureMessage);
        }
        if (fromInternal.causesFailover() && isActiveController()) {
            renounce();
        }
        return fromInternal.effectiveExternalException();
    }

    private long updateEventStartMetricsAndGetTime(OptionalLong optionalLong) {
        long nanoseconds = this.time.nanoseconds();
        this.controllerMetrics.incrementOperationsStarted();
        if (optionalLong.isPresent()) {
            this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(nanoseconds - optionalLong.getAsLong()));
        }
        return nanoseconds;
    }

    void appendControlEvent(String str, Runnable runnable) {
        this.queue.append(new ControllerEvent(str, runnable));
    }

    void appendControlEventWithDeadline(String str, Runnable runnable, long j) {
        this.queue.appendWithDeadline(j, new ControllerEvent(str, runnable));
    }

    OffsetControlManager offsetControl() {
        return this.offsetControl;
    }

    ReplicationControlManager replicationControl() {
        return this.replicationControl;
    }

    ClusterControlManager clusterControl() {
        return this.clusterControl;
    }

    FeatureControlManager featureControl() {
        return this.featureControl;
    }

    ConfigurationControlManager configurationControl() {
        return this.configurationControl;
    }

    <T> CompletableFuture<T> appendReadEvent(String str, OptionalLong optionalLong, Supplier<T> supplier) {
        ControllerReadEvent controllerReadEvent = new ControllerReadEvent(str, supplier);
        if (optionalLong.isPresent()) {
            this.queue.appendWithDeadline(optionalLong.getAsLong(), controllerReadEvent);
        } else {
            this.queue.append(controllerReadEvent);
        }
        return controllerReadEvent.future();
    }

    static long appendRecords(Logger logger, ControllerResult<?> controllerResult, int i, Function<List<ApiMessageAndVersion>, Long> function) {
        try {
            List<ApiMessageAndVersion> records = controllerResult.records();
            if (controllerResult.isAtomic()) {
                if (records.size() > i) {
                    throw new IllegalStateException("Attempted to atomically commit " + records.size() + " records, but maxRecordsPerBatch is " + i);
                }
                long longValue = function.apply(records).longValue();
                if (logger.isTraceEnabled()) {
                    logger.trace("Atomically appended {} record(s) ending with offset {}.", Integer.valueOf(records.size()), Long.valueOf(longValue));
                }
                return longValue;
            }
            int i2 = 0;
            int i3 = 0;
            while (true) {
                i3++;
                int i4 = i2 + i;
                if (i4 > records.size()) {
                    break;
                }
                function.apply(records.subList(i2, i4));
                i2 += i;
            }
            long longValue2 = function.apply(records.subList(i2, records.size())).longValue();
            if (logger.isTraceEnabled()) {
                logger.trace("Appended {} record(s) in {} batch(es), ending with offset {}.", new Object[]{Integer.valueOf(records.size()), Integer.valueOf(i3), Long.valueOf(longValue2)});
            }
            return longValue2;
        } catch (ApiException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    <T> CompletableFuture<T> appendWriteEvent(String str, OptionalLong optionalLong, ControllerWriteOperation<T> controllerWriteOperation) {
        return appendWriteEvent(str, optionalLong, controllerWriteOperation, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    <T> CompletableFuture<T> appendWriteEvent(String str, OptionalLong optionalLong, ControllerWriteOperation<T> controllerWriteOperation, EnumSet<ControllerOperationFlag> enumSet) {
        ControllerWriteEvent controllerWriteEvent = new ControllerWriteEvent(str, controllerWriteOperation, enumSet);
        if (optionalLong.isPresent()) {
            this.queue.appendWithDeadline(optionalLong.getAsLong(), controllerWriteEvent);
        } else {
            this.queue.append(controllerWriteEvent);
        }
        return controllerWriteEvent.future();
    }

    private boolean isActiveController() {
        return isActiveController(this.curClaimEpoch);
    }

    private static boolean isActiveController(int i) {
        return i != -1;
    }

    private void claim(int i, long j) {
        try {
            if (this.curClaimEpoch != -1) {
                throw new RuntimeException("Cannot claim leadership because we are already the active controller.");
            }
            this.curClaimEpoch = i;
            this.offsetControl.activate(j);
            this.clusterControl.activate();
            this.queue.prepend(new ControllerWriteEvent("completeActivation[" + i + "]", new CompleteActivationEvent(), EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME)));
        } catch (Throwable th) {
            this.fatalFaultHandler.handleFault("exception while claiming leadership", th);
        }
    }

    void renounce() {
        try {
            if (this.curClaimEpoch == -1) {
                throw new RuntimeException("Cannot renounce leadership because we are not the current leader.");
            }
            this.raftClient.resign(this.curClaimEpoch);
            this.curClaimEpoch = -1;
            this.deferredEventQueue.failAll(ControllerExceptions.newWrongControllerException(OptionalInt.empty()));
            this.offsetControl.deactivate();
            this.clusterControl.deactivate();
            this.periodicControl.deactivate();
        } catch (Throwable th) {
            this.fatalFaultHandler.handleFault("exception while renouncing leadership", th);
        }
    }

    private void replay(ApiMessage apiMessage, Optional<OffsetAndEpoch> optional, long j) {
        if (this.log.isTraceEnabled()) {
            if (optional.isPresent()) {
                this.log.trace("Replaying snapshot {} record {}", Snapshots.filenameFromSnapshotId(optional.get()), this.recordRedactor.toLoggableString(apiMessage));
            } else {
                this.log.trace("Replaying log record {} with offset {}", this.recordRedactor.toLoggableString(apiMessage), Long.valueOf(j));
            }
        }
        this.logReplayTracker.replay(apiMessage);
        MetadataRecordType fromId = MetadataRecordType.fromId(apiMessage.apiKey());
        switch (fromId) {
            case REGISTER_BROKER_RECORD:
                this.clusterControl.replay((RegisterBrokerRecord) apiMessage, j);
                return;
            case UNREGISTER_BROKER_RECORD:
                this.clusterControl.replay((UnregisterBrokerRecord) apiMessage);
                return;
            case TOPIC_RECORD:
                this.replicationControl.replay((TopicRecord) apiMessage);
                return;
            case PARTITION_RECORD:
                this.replicationControl.replay((PartitionRecord) apiMessage);
                return;
            case CONFIG_RECORD:
                this.configurationControl.replay((ConfigRecord) apiMessage);
                return;
            case PARTITION_CHANGE_RECORD:
                this.replicationControl.replay((PartitionChangeRecord) apiMessage);
                return;
            case FENCE_BROKER_RECORD:
                this.clusterControl.replay((FenceBrokerRecord) apiMessage);
                return;
            case UNFENCE_BROKER_RECORD:
                this.clusterControl.replay((UnfenceBrokerRecord) apiMessage);
                return;
            case REMOVE_TOPIC_RECORD:
                this.replicationControl.replay((RemoveTopicRecord) apiMessage);
                return;
            case FEATURE_LEVEL_RECORD:
                this.featureControl.replay((FeatureLevelRecord) apiMessage);
                return;
            case CLIENT_QUOTA_RECORD:
                this.clientQuotaControlManager.replay((ClientQuotaRecord) apiMessage);
                return;
            case PRODUCER_IDS_RECORD:
                this.producerIdControlManager.replay((ProducerIdsRecord) apiMessage);
                return;
            case BROKER_REGISTRATION_CHANGE_RECORD:
                this.clusterControl.replay((BrokerRegistrationChangeRecord) apiMessage);
                return;
            case ACCESS_CONTROL_ENTRY_RECORD:
                this.aclControlManager.replay((AccessControlEntryRecord) apiMessage);
                return;
            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                this.aclControlManager.replay((RemoveAccessControlEntryRecord) apiMessage);
                return;
            case USER_SCRAM_CREDENTIAL_RECORD:
                this.scramControlManager.replay((UserScramCredentialRecord) apiMessage);
                return;
            case REMOVE_USER_SCRAM_CREDENTIAL_RECORD:
                this.scramControlManager.replay((RemoveUserScramCredentialRecord) apiMessage);
                return;
            case DELEGATION_TOKEN_RECORD:
                this.delegationTokenControlManager.replay((DelegationTokenRecord) apiMessage);
                return;
            case REMOVE_DELEGATION_TOKEN_RECORD:
                this.delegationTokenControlManager.replay((RemoveDelegationTokenRecord) apiMessage);
                return;
            case NO_OP_RECORD:
            case ZK_MIGRATION_STATE_RECORD:
                return;
            case BEGIN_TRANSACTION_RECORD:
                this.offsetControl.replay((BeginTransactionRecord) apiMessage, j);
                return;
            case END_TRANSACTION_RECORD:
                this.offsetControl.replay((EndTransactionRecord) apiMessage, j);
                return;
            case ABORT_TRANSACTION_RECORD:
                this.offsetControl.replay((AbortTransactionRecord) apiMessage, j);
                return;
            case REGISTER_CONTROLLER_RECORD:
                this.clusterControl.replay((RegisterControllerRecord) apiMessage);
                return;
            case CLEAR_ELR_RECORD:
                this.replicationControl.replay((ClearElrRecord) apiMessage);
                return;
            default:
                throw new RuntimeException("Unhandled record type " + String.valueOf(fromId));
        }
    }

    private QuorumController(FaultHandler faultHandler, FaultHandler faultHandler2, LogContext logContext, int i, String str, KafkaEventQueue kafkaEventQueue, Time time, KafkaConfigSchema kafkaConfigSchema, RaftClient<ApiMessageAndVersion> raftClient, QuorumFeatures quorumFeatures, short s, int i2, ReplicaPlacer replicaPlacer, OptionalLong optionalLong, OptionalLong optionalLong2, long j, OptionalLong optionalLong3, QuorumControllerMetrics quorumControllerMetrics, Optional<CreateTopicPolicy> optional, Optional<AlterConfigPolicy> optional2, ConfigurationValidator configurationValidator, Map<String, Object> map, BootstrapMetadata bootstrapMetadata, int i3, DelegationTokenCache delegationTokenCache, String str2, long j2, long j3, long j4, long j5, String str3, long j6, long j7) {
        this.nonFatalFaultHandler = faultHandler;
        this.fatalFaultHandler = faultHandler2;
        this.log = logContext.logger(QuorumController.class);
        this.nodeId = i;
        this.clusterId = str;
        this.queue = kafkaEventQueue;
        this.time = time;
        this.controllerMetrics = quorumControllerMetrics;
        this.snapshotRegistry = new SnapshotRegistry(logContext);
        this.deferredEventQueue = new DeferredEventQueue(logContext);
        this.clientQuotaControlManager = new ClientQuotaControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).build();
        this.periodicControl = new PeriodicTaskControlManager.Builder().setLogContext(logContext).setTime(time).setQueueAccessor(this.queueAccessor).build();
        this.featureControl = new FeatureControlManager.Builder().setLogContext(logContext).setQuorumFeatures(quorumFeatures).setSnapshotRegistry(this.snapshotRegistry).setClusterFeatureSupportDescriber(this.clusterSupportDescriber).build();
        this.clusterControl = new ClusterControlManager.Builder().setLogContext(logContext).setClusterId(str).setTime(time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(j).setReplicaPlacer(replicaPlacer).setFeatureControlManager(this.featureControl).setBrokerShutdownHandler(this::handleBrokerShutdown).setInterBrokerListenerName(str3).build();
        this.configurationControl = new ConfigurationControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setKafkaConfigSchema(kafkaConfigSchema).setExistenceChecker(this.resourceExists).setAlterConfigPolicy(optional2).setValidator(configurationValidator).setStaticConfig(map).setNodeId(i).setFeatureControl(this.featureControl).build();
        this.producerIdControlManager = new ProducerIdControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setClusterControlManager(this.clusterControl).build();
        this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(logContext).setDefaultReplicationFactor(s).setDefaultNumPartitions(i2).setMaxElectionsPerImbalance(1000).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setCreateTopicPolicy(optional).setFeatureControl(this.featureControl).build();
        this.scramControlManager = new ScramControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).build();
        this.delegationTokenControlManager = new DelegationTokenControlManager.Builder().setLogContext(logContext).setTokenCache(delegationTokenCache).setDelegationTokenSecretKey(str2).setDelegationTokenMaxLifeMs(j2).setDelegationTokenExpiryTimeMs(j3).build();
        this.aclControlManager = new AclControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).build();
        this.logReplayTracker = new LogReplayTracker.Builder().setLogContext(logContext).build();
        this.raftClient = raftClient;
        this.bootstrapMetadata = bootstrapMetadata;
        this.maxRecordsPerBatch = i3;
        this.recordRedactor = new RecordRedactor(kafkaConfigSchema);
        this.performanceMonitor = new EventPerformanceMonitor.Builder().setLogContext(logContext).setPeriodNs(TimeUnit.MILLISECONDS.toNanos(j6)).setAlwaysLogThresholdNs(TimeUnit.MILLISECONDS.toNanos(j7)).build();
        if (optionalLong2.isPresent()) {
            registerWriteNoOpRecord(optionalLong2.getAsLong());
        }
        if (optionalLong3.isPresent()) {
            registerMaybeFenceStaleBroker(optionalLong3.getAsLong());
        } else {
            registerMaybeFenceStaleBroker(maybeFenceStaleBrokerPeriodNs(j));
        }
        if (optionalLong.isPresent()) {
            registerElectPreferred(optionalLong.getAsLong());
        }
        registerElectUnclean(TimeUnit.MILLISECONDS.toNanos(j5));
        registerExpireDelegationTokens(TimeUnit.MILLISECONDS.toNanos(j4));
        registerGeneratePeriodicPerformanceMessage();
        this.offsetControl = new OffsetControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setMetrics(quorumControllerMetrics).setTime(time).build();
        this.log.info("Creating new QuorumController with clusterId {}", str);
        this.raftClient.register(this.metaLogListener);
    }

    private void registerWriteNoOpRecord(long j) {
        this.periodicControl.registerTask(new PeriodicTask("writeNoOpRecord", () -> {
            return ControllerResult.of(List.of(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)), false);
        }, j, EnumSet.noneOf(PeriodicTaskFlag.class)));
    }

    static long maybeFenceStaleBrokerPeriodNs(long j) {
        return Math.max(TimeUnit.MILLISECONDS.toNanos(1L), j / 8);
    }

    private void registerMaybeFenceStaleBroker(long j) {
        PeriodicTaskControlManager periodicTaskControlManager = this.periodicControl;
        ReplicationControlManager replicationControlManager = this.replicationControl;
        Objects.requireNonNull(replicationControlManager);
        periodicTaskControlManager.registerTask(new PeriodicTask("maybeFenceStaleBroker", replicationControlManager::maybeFenceOneStaleBroker, j, EnumSet.noneOf(PeriodicTaskFlag.class)));
    }

    private void registerElectPreferred(long j) {
        PeriodicTaskControlManager periodicTaskControlManager = this.periodicControl;
        ReplicationControlManager replicationControlManager = this.replicationControl;
        Objects.requireNonNull(replicationControlManager);
        periodicTaskControlManager.registerTask(new PeriodicTask("electPreferred", replicationControlManager::maybeBalancePartitionLeaders, j, EnumSet.of(PeriodicTaskFlag.VERBOSE)));
    }

    private void registerElectUnclean(long j) {
        PeriodicTaskControlManager periodicTaskControlManager = this.periodicControl;
        ReplicationControlManager replicationControlManager = this.replicationControl;
        Objects.requireNonNull(replicationControlManager);
        periodicTaskControlManager.registerTask(new PeriodicTask("electUnclean", replicationControlManager::maybeElectUncleanLeaders, j, EnumSet.of(PeriodicTaskFlag.VERBOSE)));
    }

    private void registerGeneratePeriodicPerformanceMessage() {
        this.periodicControl.registerTask(new PeriodicTask("generatePeriodicPerformanceMessage", () -> {
            this.performanceMonitor.generatePeriodicPerformanceMessage();
            return ControllerResult.of(Collections.emptyList(), false);
        }, this.performanceMonitor.periodNs(), EnumSet.noneOf(PeriodicTaskFlag.class)));
    }

    private void registerExpireDelegationTokens(long j) {
        PeriodicTaskControlManager periodicTaskControlManager = this.periodicControl;
        DelegationTokenControlManager delegationTokenControlManager = this.delegationTokenControlManager;
        Objects.requireNonNull(delegationTokenControlManager);
        periodicTaskControlManager.registerTask(new PeriodicTask("expireDelegationTokens", delegationTokenControlManager::sweepExpiredDelegationTokens, j, EnumSet.of(PeriodicTaskFlag.VERBOSE)));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterPartitionResponseData> alterPartition(ControllerRequestContext controllerRequestContext, AlterPartitionRequestData alterPartitionRequestData) {
        return alterPartitionRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new AlterPartitionResponseData()) : appendWriteEvent("alterPartition", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.alterPartition(controllerRequestContext, alterPartitionRequestData);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(ControllerRequestContext controllerRequestContext, AlterUserScramCredentialsRequestData alterUserScramCredentialsRequestData) {
        return (alterUserScramCredentialsRequestData.deletions().isEmpty() && alterUserScramCredentialsRequestData.upsertions().isEmpty()) ? CompletableFuture.completedFuture(new AlterUserScramCredentialsResponseData()) : appendWriteEvent("alterUserScramCredentials", controllerRequestContext.deadlineNs(), () -> {
            return this.scramControlManager.alterCredentials(alterUserScramCredentialsRequestData, this.featureControl.metadataVersion());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<CreateDelegationTokenResponseData> createDelegationToken(ControllerRequestContext controllerRequestContext, CreateDelegationTokenRequestData createDelegationTokenRequestData) {
        return appendWriteEvent("createDelegationToken", controllerRequestContext.deadlineNs(), () -> {
            return this.delegationTokenControlManager.createDelegationToken(controllerRequestContext, createDelegationTokenRequestData, this.featureControl.metadataVersion());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<RenewDelegationTokenResponseData> renewDelegationToken(ControllerRequestContext controllerRequestContext, RenewDelegationTokenRequestData renewDelegationTokenRequestData) {
        return appendWriteEvent("renewDelegationToken", controllerRequestContext.deadlineNs(), () -> {
            return this.delegationTokenControlManager.renewDelegationToken(controllerRequestContext, renewDelegationTokenRequestData, this.featureControl.metadataVersion());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<ExpireDelegationTokenResponseData> expireDelegationToken(ControllerRequestContext controllerRequestContext, ExpireDelegationTokenRequestData expireDelegationTokenRequestData) {
        return appendWriteEvent("expireDelegationToken", controllerRequestContext.deadlineNs(), () -> {
            return this.delegationTokenControlManager.expireDelegationToken(controllerRequestContext, expireDelegationTokenRequestData, this.featureControl.metadataVersion());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<CreateTopicsResponseData> createTopics(ControllerRequestContext controllerRequestContext, CreateTopicsRequestData createTopicsRequestData, Set<String> set) {
        return createTopicsRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new CreateTopicsResponseData()) : appendWriteEvent("createTopics", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.createTopics(controllerRequestContext, createTopicsRequestData, set);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Void> unregisterBroker(ControllerRequestContext controllerRequestContext, int i) {
        return appendWriteEvent("unregisterBroker", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.unregisterBroker(i);
        }, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(ControllerRequestContext controllerRequestContext, Collection<String> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendReadEvent("findTopicIds", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findTopicIds(this.offsetControl.lastStableOffset(), collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(ControllerRequestContext controllerRequestContext) {
        return appendReadEvent("findAllTopicIds", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findAllTopicIds(this.offsetControl.lastStableOffset());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(ControllerRequestContext controllerRequestContext, Collection<Uuid> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendReadEvent("findTopicNames", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.findTopicNames(this.offsetControl.lastStableOffset(), collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext controllerRequestContext, Collection<Uuid> collection) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("deleteTopics", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.deleteTopics(controllerRequestContext, collection);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Collection<String>> map) {
        return appendReadEvent("describeConfigs", controllerRequestContext.deadlineNs(), () -> {
            return this.configurationControl.describeConfigs(this.offsetControl.lastStableOffset(), map);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<ElectLeadersResponseData> electLeaders(ControllerRequestContext controllerRequestContext, ElectLeadersRequestData electLeadersRequestData) {
        return (electLeadersRequestData.topicPartitions() == null || !electLeadersRequestData.topicPartitions().isEmpty()) ? appendWriteEvent("electLeaders", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.electLeaders(electLeadersRequestData);
        }) : CompletableFuture.completedFuture(new ElectLeadersResponseData());
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(ControllerRequestContext controllerRequestContext) {
        return appendReadEvent("getFinalizedFeatures", controllerRequestContext.deadlineNs(), () -> {
            return this.featureControl.finalizedFeatures(this.offsetControl.lastStableOffset());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> map, boolean z) {
        return map.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("incrementalAlterConfigs", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs = this.configurationControl.incrementalAlterConfigs(map, false);
            return z ? incrementalAlterConfigs.withoutRecords() : incrementalAlterConfigs;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(ControllerRequestContext controllerRequestContext, AlterPartitionReassignmentsRequestData alterPartitionReassignmentsRequestData) {
        return alterPartitionReassignmentsRequestData.topics().isEmpty() ? CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData()) : appendWriteEvent("alterPartitionReassignments", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.alterPartitionReassignments(alterPartitionReassignmentsRequestData);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(ControllerRequestContext controllerRequestContext, ListPartitionReassignmentsRequestData listPartitionReassignmentsRequestData) {
        return (listPartitionReassignmentsRequestData.topics() == null || !listPartitionReassignmentsRequestData.topics().isEmpty()) ? appendReadEvent("listPartitionReassignments", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.listPartitionReassignments(listPartitionReassignmentsRequestData.topics(), this.offsetControl.lastStableOffset());
        }) : CompletableFuture.completedFuture(new ListPartitionReassignmentsResponseData().setErrorMessage((String) null));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(ControllerRequestContext controllerRequestContext, Map<ConfigResource, Map<String, String>> map, boolean z) {
        return map.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("legacyAlterConfigs", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs = this.configurationControl.legacyAlterConfigs(map, false);
            return z ? legacyAlterConfigs.withoutRecords() : legacyAlterConfigs;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(ControllerRequestContext controllerRequestContext, final BrokerHeartbeatRequestData brokerHeartbeatRequestData) {
        if (this.clusterControl.trackBrokerHeartbeat(brokerHeartbeatRequestData.brokerId(), brokerHeartbeatRequestData.brokerEpoch())) {
            return appendWriteEvent("processBrokerHeartbeat", controllerRequestContext.deadlineNs(), new ControllerWriteOperation<BrokerHeartbeatReply>() { // from class: org.apache.kafka.controller.QuorumController.1
                private final int brokerId;
                private boolean inControlledShutdown = false;

                {
                    this.brokerId = brokerHeartbeatRequestData.brokerId();
                }

                @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
                public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                    OptionalLong registerBrokerRecordOffset = QuorumController.this.clusterControl.registerBrokerRecordOffset(this.brokerId);
                    if (registerBrokerRecordOffset.isEmpty()) {
                        throw new StaleBrokerEpochException(String.format("Receive a heartbeat from broker %d before registration", Integer.valueOf(this.brokerId)));
                    }
                    ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat = QuorumController.this.replicationControl.processBrokerHeartbeat(brokerHeartbeatRequestData, registerBrokerRecordOffset.getAsLong());
                    this.inControlledShutdown = processBrokerHeartbeat.response().inControlledShutdown();
                    return processBrokerHeartbeat;
                }

                @Override // org.apache.kafka.controller.QuorumController.ControllerWriteOperation
                public void processBatchEndOffset(long j) {
                    if (this.inControlledShutdown) {
                        QuorumController.this.clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(this.brokerId, j);
                    }
                }
            }, EnumSet.noneOf(ControllerOperationFlag.class)).whenComplete((brokerHeartbeatReply, th) -> {
                if (ControllerExceptions.isTimeoutException(th)) {
                    this.replicationControl.processExpiredBrokerHeartbeat(brokerHeartbeatRequestData);
                    this.controllerMetrics.incrementTimedOutHeartbeats();
                }
            });
        }
        throw ControllerExceptions.newWrongControllerException(latestController());
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<BrokerRegistrationReply> registerBroker(ControllerRequestContext controllerRequestContext, BrokerRegistrationRequestData brokerRegistrationRequestData) {
        return appendWriteEvent("registerBroker", controllerRequestContext.deadlineNs(), () -> {
            HashMap hashMap = new HashMap(this.featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
            hashMap.put("kraft.version", Short.valueOf(this.raftClient.kraftVersion().featureLevel()));
            return this.clusterControl.registerBroker(brokerRegistrationRequestData, this.offsetControl.nextWriteOffset(), new FinalizedControllerFeatures(hashMap, Long.MAX_VALUE), controllerRequestContext.requestHeader().requestApiVersion() >= 3);
        }, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(ControllerRequestContext controllerRequestContext, Collection<ClientQuotaAlteration> collection, boolean z) {
        return collection.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyMap()) : appendWriteEvent("alterClientQuotas", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<Map<ClientQuotaEntity, ApiError>> alterClientQuotas = this.clientQuotaControlManager.alterClientQuotas(collection);
            return z ? alterClientQuotas.withoutRecords() : alterClientQuotas;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(ControllerRequestContext controllerRequestContext, AllocateProducerIdsRequestData allocateProducerIdsRequestData) {
        return appendWriteEvent("allocateProducerIds", controllerRequestContext.deadlineNs(), () -> {
            return this.producerIdControlManager.generateNextProducerId(allocateProducerIdsRequestData.brokerId(), allocateProducerIdsRequestData.brokerEpoch());
        }).thenApply(producerIdsBlock -> {
            return new AllocateProducerIdsResponseData().setProducerIdStart(producerIdsBlock.firstProducerId()).setProducerIdLen(producerIdsBlock.size());
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(ControllerRequestContext controllerRequestContext, UpdateFeaturesRequestData updateFeaturesRequestData) {
        return appendWriteEvent("updateFeatures", controllerRequestContext.deadlineNs(), () -> {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            updateFeaturesRequestData.featureUpdates().forEach(featureUpdateKey -> {
                String feature = featureUpdateKey.feature();
                hashMap2.put(feature, FeatureUpdate.UpgradeType.fromCode(featureUpdateKey.upgradeType()));
                hashMap.put(feature, Short.valueOf(featureUpdateKey.maxVersionLevel()));
            });
            return this.configurationControl.updateFeatures(hashMap, hashMap2, updateFeaturesRequestData.validateOnly());
        }).thenApply(apiError -> {
            UpdateFeaturesResponseData updateFeaturesResponseData = new UpdateFeaturesResponseData();
            if (apiError != ApiError.NONE) {
                updateFeaturesResponseData.setErrorCode(apiError.error().code());
                updateFeaturesResponseData.setErrorMessage("The update failed for all features since the following feature had an error: " + apiError.message());
            } else {
                updateFeaturesResponseData.setErrorCode(apiError.error().code());
                updateFeaturesResponseData.setErrorMessage(apiError.message());
                if (controllerRequestContext.requestHeader().requestApiVersion() <= 1) {
                    updateFeaturesResponseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(updateFeaturesRequestData.featureUpdates().size()));
                    updateFeaturesRequestData.featureUpdates().forEach(featureUpdateKey -> {
                        updateFeaturesResponseData.results().add(new UpdateFeaturesResponseData.UpdatableFeatureResult().setFeature(featureUpdateKey.feature()).setErrorCode(apiError.error().code()).setErrorMessage(apiError.error().message()));
                    });
                }
            }
            return updateFeaturesResponseData;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(ControllerRequestContext controllerRequestContext, List<CreatePartitionsRequestData.CreatePartitionsTopic> list, boolean z) {
        return list.isEmpty() ? CompletableFuture.completedFuture(Collections.emptyList()) : appendWriteEvent("createPartitions", controllerRequestContext.deadlineNs(), () -> {
            ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions = this.replicationControl.createPartitions(controllerRequestContext, list);
            if (z) {
                this.log.debug("Validate-only CreatePartitions result(s): {}", createPartitions.response());
                return createPartitions.withoutRecords();
            }
            this.log.debug("CreatePartitions result(s): {}", createPartitions.response());
            return createPartitions;
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Void> registerController(ControllerRequestContext controllerRequestContext, ControllerRegistrationRequestData controllerRegistrationRequestData) {
        return appendWriteEvent("registerController", controllerRequestContext.deadlineNs(), () -> {
            return this.clusterControl.registerController(controllerRegistrationRequestData);
        }, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    @Override // org.apache.kafka.metadata.authorizer.AclMutator
    public CompletableFuture<List<AclCreateResult>> createAcls(ControllerRequestContext controllerRequestContext, List<AclBinding> list) {
        return appendWriteEvent("createAcls", controllerRequestContext.deadlineNs(), () -> {
            return this.aclControlManager.createAcls(list);
        });
    }

    @Override // org.apache.kafka.metadata.authorizer.AclMutator
    public CompletableFuture<List<AclDeleteResult>> deleteAcls(ControllerRequestContext controllerRequestContext, List<AclBindingFilter> list) {
        return appendWriteEvent("deleteAcls", controllerRequestContext.deadlineNs(), () -> {
            return this.aclControlManager.deleteAcls(list);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<AssignReplicasToDirsResponseData> assignReplicasToDirs(ControllerRequestContext controllerRequestContext, AssignReplicasToDirsRequestData assignReplicasToDirsRequestData) {
        return appendWriteEvent("assignReplicasToDirs", controllerRequestContext.deadlineNs(), () -> {
            return this.replicationControl.handleAssignReplicasToDirs(assignReplicasToDirsRequestData);
        });
    }

    @Override // org.apache.kafka.controller.Controller
    public CompletableFuture<Void> waitForReadyBrokers(int i) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        appendControlEvent("waitForReadyBrokers", () -> {
            this.clusterControl.addReadyBrokersFuture(completableFuture, i);
        });
        return completableFuture;
    }

    @Override // org.apache.kafka.controller.Controller
    public void beginShutdown() {
        this.queue.beginShutdown("QuorumController#beginShutdown");
    }

    public int nodeId() {
        return this.nodeId;
    }

    public String clusterId() {
        return this.clusterId;
    }

    @Override // org.apache.kafka.controller.Controller
    public int curClaimEpoch() {
        return this.curClaimEpoch;
    }

    @Override // org.apache.kafka.controller.Controller, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        this.queue.close();
        this.controllerMetrics.close();
    }

    Time time() {
        return this.time;
    }

    QuorumControllerMetrics controllerMetrics() {
        return this.controllerMetrics;
    }

    void handleBrokerShutdown(int i, boolean z, List<ApiMessageAndVersion> list) {
        this.replicationControl.handleBrokerShutdown(i, z, list);
    }
}
