package org.zstacks.zbus.server.mq.store;

import com.alibaba.fastjson.JSON;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.MqInfo;
import org.zstacks.zbus.server.mq.MessageQueue;
import org.zstacks.zbus.server.mq.RequestQueue;
import org.zstacks.znet.Message;
import org.zstacks.znet.MessageAdaptor;
import org.zstacks.znet.nio.IoBuffer;

/* loaded from: input_file:org/zstacks/zbus/server/mq/store/MessageStoreSql.class */
public class MessageStoreSql implements MessageStore {
    private static final Logger log = LoggerFactory.getLogger(MessageStoreSql.class);
    private static final MessageAdaptor codec = new MessageAdaptor();
    private static final String CONFIG_FILE = "sql.properties";
    private Connection connection;
    private final String brokerKey;
    private final Properties props = new Properties();
    private String driver = "org.hsqldb.jdbcDriver";
    private String url = "jdbc:hsqldb:db/zbus";
    private String user = "sa";
    private String password = "";
    private String sqlMsgs = "CREATE TABLE IF NOT EXISTS msgs(id VARCHAR(128), msg_str VARCHAR(10240000), PRIMARY KEY(id) )";
    private String sqlMqMsgs = "CREATE TABLE IF NOT EXISTS mq_msgs(mq_id VARCHAR(128), msg_id VARCHAR(128) )";
    private String sqlMqs = "CREATE TABLE IF NOT EXISTS mqs(id VARCHAR(512), mq_info VARCHAR(10240000), PRIMARY KEY(id) )";

    public MessageStoreSql(String str) throws Exception {
        this.brokerKey = str;
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(CONFIG_FILE);
        try {
            if (resourceAsStream != null) {
                this.props.load(resourceAsStream);
            } else {
                log.warn("missing properties: sql.properties");
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    private Connection getConnection() {
        try {
            this.url = this.props.getProperty("url", this.url).trim();
            this.user = this.props.getProperty("sa", this.user).trim();
            this.password = this.props.getProperty("password", this.password).trim();
            return DriverManager.getConnection(this.url, this.user, this.password);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return null;
        }
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void start() throws Exception {
        this.driver = this.props.getProperty("driver", this.driver).trim();
        Class.forName(this.driver);
        this.connection = getConnection();
        initDbTable();
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void shutdown() throws Exception {
        if (this.connection == null) {
            return;
        }
        try {
            this.connection.createStatement().execute("SHUTDOWN");
            this.connection.close();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private String msgKey(Message message) {
        return message.getMsgId();
    }

    private String mqKey(String str) {
        return String.format("%s-%s", this.brokerKey, str);
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void saveMessage(Message message) {
        try {
            String msgKey = msgKey(message);
            String mqKey = mqKey(message.getMq());
            update("INSERT INTO msgs(id, msg_str) VALUES(?,?)", msgKey, message.toString());
            update("INSERT INTO mq_msgs(mq_id, msg_id) VALUES(?,?)", mqKey, msgKey);
            if (log.isDebugEnabled()) {
                log.debug("save " + msgKey);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void removeMessage(Message message) {
        try {
            String msgKey = msgKey(message);
            update("DELETE FROM msgs WHERE id=?", msgKey);
            update("DELETE FROM mq_msgs WHERE msg_id=?", msgKey);
            if (log.isDebugEnabled()) {
                log.debug("delete " + msgKey);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void onMessageQueueCreated(MessageQueue messageQueue) {
        try {
            update("INSERT INTO mqs(id, mq_info) VALUES(?,?)", mqKey(messageQueue.getName()), JSON.toJSONString(messageQueue.getMqInfo()));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public void onMessageQueueRemoved(MessageQueue messageQueue) {
        try {
            update("DELETE FROM mqs WHERE id=?", mqKey(messageQueue.getName()));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    @Override // org.zstacks.zbus.server.mq.store.MessageStore
    public ConcurrentMap<String, MessageQueue> loadMqTable() throws SQLException {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ResultSet query = query("SELECT * FROM mqs");
        while (query.next()) {
            String string = query.getString("id");
            String substring = string.substring(string.indexOf(45) + 1);
            MqInfo mqInfo = (MqInfo) JSON.parseObject(query.getString("mq_info"), MqInfo.class);
            int mode = mqInfo.getMode();
            if (MessageMode.isEnabled(mode, MessageMode.MQ)) {
                RequestQueue requestQueue = new RequestQueue(mqInfo.getBroker(), substring, null, mode);
                requestQueue.setCreator(mqInfo.getCreator());
                requestQueue.setMessageStore(this);
                ResultSet query2 = query(String.format("SELECT msg_str FROM mq_msgs, msgs WHERE mq_msgs.msg_id = msgs.id AND mq_msgs.mq_id='%s'", string));
                ArrayList arrayList = new ArrayList();
                while (query2.next()) {
                    Message message = (Message) codec.decode(IoBuffer.wrap(query2.getString("msg_str")));
                    if (message != null) {
                        arrayList.add(message);
                    } else {
                        log.error("message decode error");
                    }
                }
                query2.close();
                requestQueue.loadMessageList(arrayList);
                concurrentHashMap.put(substring, requestQueue);
            } else {
                log.warn("message queue mode not support");
            }
        }
        query.close();
        return concurrentHashMap;
    }

    private void initDbTable() throws SQLException {
        this.sqlMsgs = this.props.getProperty("sql_msgs", this.sqlMsgs).trim();
        this.sqlMqMsgs = this.props.getProperty("sql_mq_msgs", this.sqlMqMsgs).trim();
        this.sqlMqs = this.props.getProperty("sql_mqs", this.sqlMqs).trim();
        update(this.sqlMsgs, new Object[0]);
        update(this.sqlMqMsgs, new Object[0]);
        update(this.sqlMqs, new Object[0]);
    }

    private synchronized ResultSet query(String str) throws SQLException {
        if (this.connection == null) {
            return null;
        }
        return this.connection.createStatement().executeQuery(str);
    }

    private synchronized void update(String str, Object... objArr) throws SQLException {
        if (this.connection == null) {
            return;
        }
        PreparedStatement prepareStatement = this.connection.prepareStatement(str);
        for (int i = 0; i < objArr.length; i++) {
            prepareStatement.setObject(i + 1, objArr[i]);
        }
        if (prepareStatement.executeUpdate() == -1) {
            log.error("db error : " + str);
        }
        prepareStatement.close();
    }
}
