package org.apache.rocketmq.controller;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

/* loaded from: input_file:org/apache/rocketmq/controller/ControllerManager.class */
public class ControllerManager {
    private static final InternalLogger log;
    private final ControllerConfig controllerConfig;
    private final NettyServerConfig nettyServerConfig;
    private final NettyClientConfig nettyClientConfig;
    private final BrokerHousekeepingService brokerHousekeepingService = new BrokerHousekeepingService(this);
    private final Configuration configuration;
    private final RemotingClient remotingClient;
    private Controller controller;
    private BrokerHeartbeatManager heartbeatManager;
    private ExecutorService controllerRequestExecutor;
    private BlockingQueue<Runnable> controllerRequestThreadPoolQueue;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
        this.controllerConfig = controllerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.configuration = new Configuration(log, new Object[]{this.controllerConfig, this.nettyServerConfig});
        this.configuration.setStorePathFromConfig(this.controllerConfig, "configStorePath");
        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
    }

    public boolean initialize() {
        this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
        this.controllerRequestExecutor = new ThreadPoolExecutor(this.controllerConfig.getControllerThreadPoolNums(), this.controllerConfig.getControllerThreadPoolNums(), 60000L, TimeUnit.MILLISECONDS, this.controllerRequestThreadPoolQueue, new ThreadFactoryImpl("ControllerRequestExecutorThread_")) { // from class: org.apache.rocketmq.controller.ControllerManager.1
            @Override // java.util.concurrent.AbstractExecutorService
            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
                return new FutureTaskExt(runnable, t);
            }
        };
        this.heartbeatManager = new DefaultBrokerHeartbeatManager(this.controllerConfig);
        this.controller = new DLedgerController(this.controllerConfig, (str, str2) -> {
            return this.heartbeatManager.isBrokerActive(str, str2);
        }, this.nettyServerConfig, this.nettyClientConfig, this.brokerHousekeepingService);
        this.heartbeatManager.addBrokerLifecycleListener(new BrokerHeartbeatManager.BrokerLifecycleListener() { // from class: org.apache.rocketmq.controller.ControllerManager.2
            @Override // org.apache.rocketmq.controller.BrokerHeartbeatManager.BrokerLifecycleListener
            public void onBrokerInactive(String str3, String str4, String str5, long j) {
                if (j == 0) {
                    if (!ControllerManager.this.controller.isLeaderState()) {
                        ControllerManager.log.info("Broker{}' master shutdown", str4);
                        return;
                    }
                    try {
                        ElectMasterResponseHeader readCustomHeader = ControllerManager.this.controller.electMaster(new ElectMasterRequestHeader(str4)).get(5L, TimeUnit.SECONDS).readCustomHeader();
                        if (readCustomHeader != null) {
                            ControllerManager.log.info("Broker {}'s master {} shutdown, elect a new master done, result:{}", new Object[]{str4, str5, readCustomHeader});
                            if (StringUtils.isNotEmpty(readCustomHeader.getNewMasterAddress())) {
                                ControllerManager.this.heartbeatManager.changeBrokerMetadata(str3, readCustomHeader.getNewMasterAddress(), 0L);
                            }
                            if (ControllerManager.this.controllerConfig.isNotifyBrokerRoleChanged()) {
                                ControllerManager.this.notifyBrokerRoleChanged(readCustomHeader, str3);
                            }
                        }
                    } catch (Exception e) {
                    }
                }
            }
        });
        registerProcessor();
        return true;
    }

    public void notifyBrokerRoleChanged(ElectMasterResponseHeader electMasterResponseHeader, String str) {
        BrokerMemberGroup brokerMemberGroup = electMasterResponseHeader.getBrokerMemberGroup();
        if (brokerMemberGroup != null) {
            String newMasterAddress = electMasterResponseHeader.getNewMasterAddress();
            if (StringUtils.isNoneEmpty(new CharSequence[]{newMasterAddress}) && this.heartbeatManager.isBrokerActive(str, newMasterAddress)) {
                doNotifyBrokerRoleChanged(newMasterAddress, 0L, electMasterResponseHeader);
            }
            for (Map.Entry entry : brokerMemberGroup.getBrokerAddrs().entrySet()) {
                if (!((String) entry.getValue()).equals(newMasterAddress) && this.heartbeatManager.isBrokerActive(str, (String) entry.getValue())) {
                    doNotifyBrokerRoleChanged((String) entry.getValue(), (Long) entry.getKey(), electMasterResponseHeader);
                }
            }
        }
    }

    public void doNotifyBrokerRoleChanged(String str, Long l, ElectMasterResponseHeader electMasterResponseHeader) {
        if (StringUtils.isNoneEmpty(new CharSequence[]{str})) {
            log.info("Try notify broker {} with id {} that role changed, responseHeader:{}", new Object[]{str, l, electMasterResponseHeader});
            try {
                this.remotingClient.invokeOneway(str, RemotingCommand.createRequestCommand(1008, new NotifyBrokerRoleChangedRequestHeader(electMasterResponseHeader.getNewMasterAddress(), electMasterResponseHeader.getMasterEpoch(), electMasterResponseHeader.getSyncStateSetEpoch(), l.longValue())), 3000L);
            } catch (Exception e) {
                log.error("Failed to notify broker {} with id {} that role changed", new Object[]{str, l, e});
            }
        }
    }

    public void registerProcessor() {
        ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this);
        RemotingServer remotingServer = this.controller.getRemotingServer();
        if (!$assertionsDisabled && remotingServer == null) {
            throw new AssertionError();
        }
        remotingServer.registerProcessor(1001, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1002, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1003, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1004, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1005, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(1006, controllerRequestProcessor, this.controllerRequestExecutor);
        remotingServer.registerProcessor(904, controllerRequestProcessor, this.controllerRequestExecutor);
    }

    public void start() {
        this.heartbeatManager.start();
        this.controller.startup();
        this.remotingClient.start();
    }

    public void shutdown() {
        this.heartbeatManager.shutdown();
        this.controllerRequestExecutor.shutdown();
        this.controller.shutdown();
        this.remotingClient.shutdown();
    }

    public BrokerHeartbeatManager getHeartbeatManager() {
        return this.heartbeatManager;
    }

    public ControllerConfig getControllerConfig() {
        return this.controllerConfig;
    }

    public Controller getController() {
        return this.controller;
    }

    public NettyServerConfig getNettyServerConfig() {
        return this.nettyServerConfig;
    }

    public NettyClientConfig getNettyClientConfig() {
        return this.nettyClientConfig;
    }

    public BrokerHousekeepingService getBrokerHousekeepingService() {
        return this.brokerHousekeepingService;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    static {
        $assertionsDisabled = !ControllerManager.class.desiredAssertionStatus();
        log = InternalLoggerFactory.getLogger("RocketmqController");
    }
}
