package org.zstacks.zbus.server;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.Proto;
import org.zstacks.zbus.server.mq.MessageQueue;
import org.zstacks.zbus.server.mq.ReplyQueue;
import org.zstacks.zbus.server.mq.store.MessageStore;
import org.zstacks.zbus.server.mq.store.MessageStoreFactory;
import org.zstacks.znet.Helper;
import org.zstacks.znet.Message;
import org.zstacks.znet.MessageHandler;
import org.zstacks.znet.RemotingServer;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.nio.Session;

/* loaded from: input_file:org/zstacks/zbus/server/ZbusServer.class */
public class ZbusServer extends RemotingServer {
    private static final Logger log = LoggerFactory.getLogger(ZbusServer.class);
    private final ConcurrentMap<String, MessageQueue> mqTable;
    private boolean verbose;
    private MessageStore messageStore;
    private String messageStoreType;
    private String adminToken;
    private final AdminHandler adminHandler;
    private final TrackReport trackReport;
    private final ExecutorService executorService;
    private final ScheduledExecutorService scheduledExecutor;
    private long mqCleanInterval;

    /* loaded from: input_file:org/zstacks/zbus/server/ZbusServer$ZbusServerConfig.class */
    public static class ZbusServerConfig {
        public String trackServerAddr;
        public int serverPort = 15555;
        public String adminToken = "";
        public String storeType = MessageStoreFactory.DUMMY;
        public int selectorCount = 1;
        public int executorCount = 4;
        public boolean verbose = true;
    }

    public ZbusServer(int i, Dispatcher dispatcher) throws IOException {
        this("0.0.0.0", i, dispatcher);
    }

    public ZbusServer(String str, int i, Dispatcher dispatcher) throws IOException {
        super(str, i, dispatcher);
        this.mqTable = new ConcurrentHashMap();
        this.verbose = true;
        this.messageStoreType = MessageStoreFactory.DUMMY;
        this.adminToken = "";
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.mqCleanInterval = 3000L;
        if (!dispatcher.isStarted()) {
            dispatcher.start();
        }
        this.executorService = dispatcher.executorService();
        this.serverName = "ZbusServer";
        this.trackReport = new TrackReport(this.mqTable, this.serverAddr);
        this.adminHandler = new AdminHandler(this.mqTable, this.executorService, this.serverAddr, this.trackReport);
        this.adminHandler.setAccessToken(this.adminToken);
        initHandlers();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageQueue findMQ(Message message, Session session) throws IOException {
        String mq = message.getMq();
        if (mq == null) {
            mq = message.getPath();
        }
        MessageQueue messageQueue = this.mqTable.get(mq);
        boolean isAck = message.isAck();
        if (messageQueue == null) {
            if (!isAck) {
                return null;
            }
            ServerHelper.reply404(message, session);
            return null;
        }
        if ("".equals(messageQueue.getAccessToken()) || messageQueue.getAccessToken().equals(message.getToken())) {
            return messageQueue;
        }
        if (!isAck) {
            return null;
        }
        ServerHelper.reply403(message, session);
        return null;
    }

    private void initHandlers() {
        registerGlobalHandler(new MessageHandler() { // from class: org.zstacks.zbus.server.ZbusServer.1
            public void handleMessage(Message message, Session session) throws IOException {
                String mqReply = message.getMqReply();
                if (mqReply == null || mqReply.equals("")) {
                    message.setMqReply(session.id());
                }
                if (message.getMsgId() == null) {
                    message.setMsgId(UUID.randomUUID().toString());
                }
                message.setHead("remote-addr", session.getRemoteAddress());
                message.setHead("broker", ZbusServer.this.serverAddr);
                if (Proto.Heartbeat.equals(message.getCommand())) {
                    return;
                }
                if (ZbusServer.this.verbose) {
                    ZbusServer.log.info("{}", message);
                } else {
                    ZbusServer.log.debug("{}", message);
                }
            }
        });
        registerHandler(Proto.Produce, new MessageHandler() { // from class: org.zstacks.zbus.server.ZbusServer.2
            public void handleMessage(Message message, Session session) throws IOException {
                MessageQueue findMQ = ZbusServer.this.findMQ(message, session);
                if (findMQ == null) {
                    return;
                }
                findMQ.produce(message, session);
            }
        });
        registerHandler(Proto.Consume, new MessageHandler() { // from class: org.zstacks.zbus.server.ZbusServer.3
            public void handleMessage(Message message, Session session) throws IOException {
                MessageQueue findMQ = ZbusServer.this.findMQ(message, session);
                if (findMQ == null) {
                    return;
                }
                findMQ.consume(message, session);
            }
        });
        registerHandler(Proto.Request, new MessageHandler() { // from class: org.zstacks.zbus.server.ZbusServer.4
            public void handleMessage(Message message, Session session) throws IOException {
                MessageQueue findMQ = ZbusServer.this.findMQ(message, session);
                if (findMQ == null) {
                    return;
                }
                String mqReply = message.getMqReply();
                MessageQueue messageQueue = (MessageQueue) ZbusServer.this.mqTable.get(mqReply);
                if (messageQueue == null) {
                    messageQueue = new ReplyQueue(ZbusServer.this.serverAddr, mqReply, ZbusServer.this.executorService, MessageMode.intValue(MessageMode.MQ, MessageMode.Temp));
                    messageQueue.setCreator(session.getRemoteAddress());
                    ZbusServer.this.mqTable.putIfAbsent(mqReply, messageQueue);
                }
                message.setAck(false);
                Message copyWithoutBody = Message.copyWithoutBody(message);
                findMQ.produce(message, session);
                messageQueue.consume(copyWithoutBody, session);
            }
        });
        registerHandler(Proto.Admin, this.adminHandler);
    }

    public void setAdminToken(String str) {
        this.adminToken = str;
    }

    public void start() throws IOException {
        super.start();
        this.messageStore = MessageStoreFactory.getMessageStore(this.serverAddr, this.messageStoreType);
        this.adminHandler.setMessageStore(this.messageStore);
        log.info("message store loading ....");
        this.mqTable.clear();
        try {
            ConcurrentMap<String, MessageQueue> loadMqTable = this.messageStore.loadMqTable();
            Iterator<Map.Entry<String, MessageQueue>> it = loadMqTable.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().setExecutor(this.executorService);
            }
            this.mqTable.putAll(loadMqTable);
            log.info("message store loaded");
        } catch (Exception e) {
            log.info("message store loading error: {}", e.getMessage(), e);
        }
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.zstacks.zbus.server.ZbusServer.5
            @Override // java.lang.Runnable
            public void run() {
                Iterator it2 = ZbusServer.this.mqTable.entrySet().iterator();
                while (it2.hasNext()) {
                    ((MessageQueue) ((Map.Entry) it2.next()).getValue()).cleanSession();
                }
            }
        }, 1000L, this.mqCleanInterval, TimeUnit.MILLISECONDS);
    }

    public void close() throws IOException {
        this.scheduledExecutor.shutdown();
        this.trackReport.close();
        super.close();
    }

    public void startTrackReport(String str) {
        try {
            this.trackReport.startTrackReport(str, this.dispatcher);
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void setMessageStoreType(String str) {
        this.messageStoreType = str;
    }

    public void onException(Throwable th, Session session) throws IOException {
        if (!(th instanceof IOException)) {
            super.onException(th, session);
        }
        cleanMQ(session);
    }

    public void onSessionDestroyed(Session session) throws IOException {
        cleanMQ(session);
    }

    public String findHandlerKey(Message message) {
        String command = message.getCommand();
        if (command == null) {
            command = message.getPath();
        }
        if (command == null || "".equals(command.trim())) {
            command = Proto.Admin;
        }
        return command;
    }

    private void cleanMQ(Session session) {
        if (this.mqTable == null) {
            return;
        }
        String remoteAddress = session.getRemoteAddress();
        Iterator<Map.Entry<String, MessageQueue>> it = this.mqTable.entrySet().iterator();
        while (it.hasNext()) {
            MessageQueue value = it.next().getValue();
            if (MessageMode.isEnabled(value.getMode(), MessageMode.Temp) && value.getCreator().equals(remoteAddress)) {
                it.remove();
            }
        }
    }

    public String getServerAddress() {
        return this.serverAddr;
    }

    public void setVerbose(boolean z) {
        this.verbose = z;
    }

    public static void main(String[] strArr) throws Exception {
        ZbusServerConfig zbusServerConfig = new ZbusServerConfig();
        zbusServerConfig.serverPort = Helper.option(strArr, "-p", 15555);
        zbusServerConfig.adminToken = Helper.option(strArr, "-admin", "");
        zbusServerConfig.trackServerAddr = Helper.option(strArr, "-track", "127.0.0.1:16666;127.0.0.1:16667");
        zbusServerConfig.storeType = Helper.option(strArr, "-store", MessageStoreFactory.DUMMY);
        zbusServerConfig.selectorCount = Helper.option(strArr, "-selector", 1);
        zbusServerConfig.executorCount = Helper.option(strArr, "-executor", 16);
        zbusServerConfig.verbose = Helper.option(strArr, "-verbose", true);
        ZbusServer zbusServer = new ZbusServer(zbusServerConfig.serverPort, new Dispatcher().selectorCount(zbusServerConfig.selectorCount).executorCount(zbusServerConfig.executorCount));
        zbusServer.setAdminToken(zbusServerConfig.adminToken);
        zbusServer.setMessageStoreType(zbusServerConfig.storeType);
        zbusServer.setVerbose(zbusServerConfig.verbose);
        if (zbusServerConfig.trackServerAddr != null && !zbusServerConfig.trackServerAddr.equals("")) {
            zbusServer.startTrackReport(zbusServerConfig.trackServerAddr);
        }
        zbusServer.start();
    }
}
