package org.apache.rocketmq.controller.impl;

import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerRequestHeader;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.impl.event.ControllerResult;
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

/* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController.class */
public class DLedgerController implements Controller {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqController");
    private final DLedgerServer dLedgerServer;
    private final ControllerConfig controllerConfig;
    private final DLedgerConfig dLedgerConfig;
    private final ReplicasInfoManager replicasInfoManager;
    private final EventScheduler scheduler;
    private final EventSerializer eventSerializer;
    private final RoleChangeHandler roleHandler;
    private final DLedgerControllerStateMachine statemachine;
    private BiPredicate<String, String> brokerAlivePredicate;
    private volatile boolean isScheduling;

    /* renamed from: org.apache.rocketmq.controller.impl.DLedgerController$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$openmessaging$storage$dledger$MemberState$Role = new int[MemberState.Role.values().length];

        static {
            try {
                $SwitchMap$io$openmessaging$storage$dledger$MemberState$Role[MemberState.Role.CANDIDATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$openmessaging$storage$dledger$MemberState$Role[MemberState.Role.FOLLOWER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$openmessaging$storage$dledger$MemberState$Role[MemberState.Role.LEADER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController$ControllerEventHandler.class */
    public class ControllerEventHandler<T> implements EventHandler<T> {
        private final String name;
        private final Supplier<ControllerResult<T>> supplier;
        private final CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
        private final boolean isWriteEvent;

        ControllerEventHandler(String str, Supplier<ControllerResult<T>> supplier, boolean z) {
            this.name = str;
            this.supplier = supplier;
            this.isWriteEvent = z;
        }

        @Override // org.apache.rocketmq.controller.impl.DLedgerController.EventHandler
        public void run() throws Throwable {
            byte[] serialize;
            ControllerResult<T> controllerResult = this.supplier.get();
            DLedgerController.log.info("Event queue run event {}, get the result {}", this.name, controllerResult);
            boolean z = true;
            if (this.isWriteEvent) {
                List<EventMessage> events = controllerResult.getEvents();
                ArrayList arrayList = new ArrayList(events.size());
                for (EventMessage eventMessage : events) {
                    if (eventMessage != null && (serialize = DLedgerController.this.eventSerializer.serialize(eventMessage)) != null && serialize.length > 0) {
                        arrayList.add(serialize);
                    }
                }
                if (!arrayList.isEmpty()) {
                    BatchAppendEntryRequest batchAppendEntryRequest = new BatchAppendEntryRequest();
                    batchAppendEntryRequest.setBatchMsgs(arrayList);
                    z = DLedgerController.this.appendToDledgerAndWait(batchAppendEntryRequest);
                }
            } else if (DLedgerController.this.controllerConfig.isProcessReadEvent()) {
                AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
                appendEntryRequest.setBody(new byte[0]);
                z = DLedgerController.this.appendToDledgerAndWait(appendEntryRequest);
            }
            if (!z) {
                DLedgerController.log.error("Failed to append event to dledger, the response is {}, try cancel the future", controllerResult.getResponse());
                this.future.cancel(true);
                return;
            }
            RemotingCommand createResponseCommandWithHeader = RemotingCommand.createResponseCommandWithHeader(controllerResult.getResponseCode(), (CommandCustomHeader) controllerResult.getResponse());
            if (controllerResult.getBody() != null) {
                createResponseCommandWithHeader.setBody(controllerResult.getBody());
            }
            if (controllerResult.getRemark() != null) {
                createResponseCommandWithHeader.setRemark(controllerResult.getRemark());
            }
            this.future.complete(createResponseCommandWithHeader);
        }

        @Override // org.apache.rocketmq.controller.impl.DLedgerController.EventHandler
        public CompletableFuture<RemotingCommand> future() {
            return this.future;
        }

        @Override // org.apache.rocketmq.controller.impl.DLedgerController.EventHandler
        public void handleException(Throwable th) {
            DLedgerController.log.error("Error happen when handle event {}", this.name, th);
            this.future.completeExceptionally(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController$EventHandler.class */
    public interface EventHandler<T> {
        void run() throws Throwable;

        CompletableFuture<RemotingCommand> future();

        void handleException(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController$EventScheduler.class */
    public class EventScheduler extends ServiceThread {
        private final BlockingQueue<EventHandler> eventQueue = new LinkedBlockingQueue(1024);

        public EventScheduler() {
        }

        public String getServiceName() {
            return EventScheduler.class.getName();
        }

        public void run() {
            DLedgerController.log.info("Start event scheduler.");
            while (!isStopped()) {
                try {
                    EventHandler poll = this.eventQueue.poll(5L, TimeUnit.SECONDS);
                    if (poll != null) {
                        try {
                            poll.run();
                        } catch (Throwable th) {
                            poll.handleException(th);
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        public <T> CompletableFuture<RemotingCommand> appendEvent(String str, Supplier<ControllerResult<T>> supplier, boolean z) {
            if (isStopped() || !DLedgerController.this.roleHandler.isLeaderState()) {
                RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(2007, "The controller is not in leader state");
                CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
                completableFuture.complete(createResponseCommand);
                return completableFuture;
            }
            ControllerEventHandler controllerEventHandler = new ControllerEventHandler(str, supplier, z);
            int i = 0;
            while (true) {
                try {
                } catch (InterruptedException e) {
                    DLedgerController.log.error("Error happen in EventScheduler when append event", e);
                    i++;
                    if (i > 3) {
                        return null;
                    }
                }
                if (this.eventQueue.offer(controllerEventHandler, 5L, TimeUnit.SECONDS)) {
                    return controllerEventHandler.future();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/controller/impl/DLedgerController$RoleChangeHandler.class */
    public class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
        private final String selfId;
        private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
        private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;

        public RoleChangeHandler(String str) {
            this.selfId = str;
        }

        public void handle(long j, MemberState.Role role) {
            this.executorService.submit(() -> {
                switch (AnonymousClass1.$SwitchMap$io$openmessaging$storage$dledger$MemberState$Role[role.ordinal()]) {
                    case 1:
                        this.currentRole = MemberState.Role.CANDIDATE;
                        DLedgerController.log.info("Controller {} change role to candidate", this.selfId);
                        DLedgerController.this.stopScheduling();
                        return;
                    case 2:
                        this.currentRole = MemberState.Role.FOLLOWER;
                        DLedgerController.log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, DLedgerController.this.getMemberState().getLeaderId());
                        DLedgerController.this.stopScheduling();
                        return;
                    case 3:
                        DLedgerController.log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
                        int i = 0;
                        while (true) {
                            AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
                            appendEntryRequest.setBody(new byte[0]);
                            try {
                            } catch (Throwable th) {
                                DLedgerController.log.error("Error happen when controller leader append initial request to dledger", th);
                                i++;
                                if (i % 3 == 0) {
                                    DLedgerController.log.warn("Controller leader append initial log failed too many times, please wait a while");
                                }
                            }
                            if (DLedgerController.this.appendToDledgerAndWait(appendEntryRequest)) {
                                this.currentRole = MemberState.Role.LEADER;
                                DLedgerController.this.startScheduling();
                                return;
                            }
                            continue;
                        }
                    default:
                        return;
                }
            });
        }

        public void startup() {
        }

        public void shutdown() {
            if (this.currentRole == MemberState.Role.LEADER) {
                DLedgerController.this.stopScheduling();
            }
            this.executorService.shutdown();
        }

        public boolean isLeaderState() {
            return this.currentRole == MemberState.Role.LEADER;
        }
    }

    public DLedgerController(ControllerConfig controllerConfig, BiPredicate<String, String> biPredicate) {
        this(controllerConfig, biPredicate, null, null, null);
    }

    public DLedgerController(ControllerConfig controllerConfig, BiPredicate<String, String> biPredicate, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
        this.isScheduling = false;
        this.controllerConfig = controllerConfig;
        this.eventSerializer = new EventSerializer();
        this.scheduler = new EventScheduler();
        this.brokerAlivePredicate = biPredicate;
        this.dLedgerConfig = new DLedgerConfig();
        this.dLedgerConfig.setGroup(controllerConfig.getControllerDLegerGroup());
        this.dLedgerConfig.setPeers(controllerConfig.getControllerDLegerPeers());
        this.dLedgerConfig.setSelfId(controllerConfig.getControllerDLegerSelfId());
        this.dLedgerConfig.setStoreBaseDir(controllerConfig.getControllerStorePath());
        this.dLedgerConfig.setMappedFileSizeForEntryData(controllerConfig.getMappedFileSize());
        this.roleHandler = new RoleChangeHandler(this.dLedgerConfig.getSelfId());
        this.replicasInfoManager = new ReplicasInfoManager(controllerConfig);
        this.statemachine = new DLedgerControllerStateMachine(this.replicasInfoManager, this.eventSerializer, this.dLedgerConfig.getSelfId());
        this.dLedgerServer = new DLedgerServer(this.dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
        this.dLedgerServer.registerStateMachine(this.statemachine);
        this.dLedgerServer.getdLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void startup() {
        this.dLedgerServer.startup();
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void shutdown() {
        this.dLedgerServer.shutdown();
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void startScheduling() {
        if (this.isScheduling) {
            return;
        }
        log.info("Start scheduling controller events");
        this.isScheduling = true;
        this.scheduler.start();
    }

    @Override // org.apache.rocketmq.controller.Controller
    public void stopScheduling() {
        if (this.isScheduling) {
            log.info("Stop scheduling controller events");
            this.isScheduling = false;
            this.scheduler.shutdown(true);
        }
    }

    @Override // org.apache.rocketmq.controller.Controller
    public boolean isLeaderState() {
        return this.roleHandler.isLeaderState();
    }

    public ControllerConfig getControllerConfig() {
        return this.controllerConfig;
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> alterSyncStateSet(AlterSyncStateSetRequestHeader alterSyncStateSetRequestHeader, SyncStateSet syncStateSet) {
        return this.scheduler.appendEvent("alterSyncStateSet", () -> {
            return this.replicasInfoManager.alterSyncStateSet(alterSyncStateSetRequestHeader, syncStateSet, this.brokerAlivePredicate);
        }, true);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> electMaster(ElectMasterRequestHeader electMasterRequestHeader) {
        return this.scheduler.appendEvent("electMaster", () -> {
            return this.replicasInfoManager.electMaster(electMasterRequestHeader, this.brokerAlivePredicate);
        }, true);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> registerBroker(RegisterBrokerToControllerRequestHeader registerBrokerToControllerRequestHeader) {
        return this.scheduler.appendEvent("registerBroker", () -> {
            return this.replicasInfoManager.registerBroker(registerBrokerToControllerRequestHeader);
        }, true);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> getReplicaInfo(GetReplicaInfoRequestHeader getReplicaInfoRequestHeader) {
        return this.scheduler.appendEvent("getReplicaInfo", () -> {
            return this.replicasInfoManager.getReplicaInfo(getReplicaInfoRequestHeader);
        }, false);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public CompletableFuture<RemotingCommand> getSyncStateData(List<String> list) {
        return this.scheduler.appendEvent("getSyncStateData", () -> {
            return this.replicasInfoManager.getSyncStateData(list);
        }, false);
    }

    @Override // org.apache.rocketmq.controller.Controller
    public RemotingCommand getControllerMetadata() {
        MemberState memberState = getMemberState();
        Map peerMap = memberState.getPeerMap();
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : peerMap.entrySet()) {
            sb.append(((String) entry.getKey()) + ":" + ((String) entry.getValue())).append(";");
        }
        return RemotingCommand.createResponseCommandWithHeader(0, new GetMetaDataResponseHeader(memberState.getGroup(), memberState.getLeaderId(), memberState.getLeaderAddr(), memberState.isLeader(), sb.toString()));
    }

    @Override // org.apache.rocketmq.controller.Controller
    public RemotingServer getRemotingServer() {
        return this.dLedgerServer.getRemotingServer();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean appendToDledgerAndWait(AppendEntryRequest appendEntryRequest) throws Throwable {
        if (appendEntryRequest == null) {
            return false;
        }
        appendEntryRequest.setGroup(this.dLedgerConfig.getGroup());
        appendEntryRequest.setRemoteId(this.dLedgerConfig.getSelfId());
        AppendFuture handleAppend = this.dLedgerServer.handleAppend(appendEntryRequest);
        if (handleAppend.getPos() == -1) {
            return false;
        }
        handleAppend.get(5L, TimeUnit.SECONDS);
        return true;
    }

    public MemberState getMemberState() {
        return this.dLedgerServer.getMemberState();
    }

    public void setBrokerAlivePredicate(BiPredicate<String, String> biPredicate) {
        this.brokerAlivePredicate = biPredicate;
    }
}
