package org.zstacks.zbus.server.mq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.ConsumerInfo;
import org.zstacks.zbus.server.ServerHelper;
import org.zstacks.znet.Message;
import org.zstacks.znet.nio.Session;

/* loaded from: input_file:org/zstacks/zbus/server/mq/PubsubQueue.class */
public class PubsubQueue extends MessageQueue {
    private static final long serialVersionUID = -593851217778104787L;
    private static final Logger log = LoggerFactory.getLogger(PubsubQueue.class);
    protected final BlockingQueue<Message> msgQ;
    transient ConcurrentMap<String, PullSession> sessMap;

    public PubsubQueue(String str, String str2, ExecutorService executorService, int i) {
        super(str, str2, executorService, i);
        this.msgQ = new LinkedBlockingQueue();
        this.sessMap = new ConcurrentHashMap();
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void produce(Message message, Session session) throws IOException {
        String msgId = message.getMsgId();
        if (message.isAck()) {
            ServerHelper.reply200(msgId, session);
        }
        this.msgQ.offer(message);
        dispatch();
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void consume(Message message, Session session) throws IOException {
        PullSession pullSession = this.sessMap.get(session.id());
        if (pullSession != null) {
            pullSession.setPullMsg(message);
        } else {
            this.sessMap.putIfAbsent(session.id(), new PullSession(session, message));
        }
        dispatch();
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void cleanSession() {
        Iterator<Map.Entry<String, PullSession>> it = this.sessMap.entrySet().iterator();
        while (it.hasNext()) {
            if (!it.next().getValue().session.isActive()) {
                it.remove();
            }
        }
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    void doDispatch() throws IOException {
        while (true) {
            Message poll = this.msgQ.poll();
            if (poll == null) {
                break;
            }
            String topic = poll.getTopic();
            Iterator<Map.Entry<String, PullSession>> it = this.sessMap.entrySet().iterator();
            while (it.hasNext()) {
                PullSession value = it.next().getValue();
                if (value == null || !value.getSession().isActive()) {
                    it.remove();
                } else if (value.isTopicMatched(topic)) {
                    Message copyWithoutBody = Message.copyWithoutBody(poll);
                    prepareMessageStatus(copyWithoutBody);
                    value.getMsgQ().offer(copyWithoutBody);
                }
            }
        }
        Iterator<Map.Entry<String, PullSession>> it2 = this.sessMap.entrySet().iterator();
        while (it2.hasNext()) {
            PullSession value2 = it2.next().getValue();
            if (value2 == null || !value2.getSession().isActive()) {
                it2.remove();
            } else {
                try {
                    try {
                        value2.pullMsgLock.lock();
                        Message pullMsg = value2.getPullMsg();
                        if (pullMsg == null) {
                            value2.pullMsgLock.unlock();
                        } else {
                            Message poll2 = value2.getMsgQ().poll();
                            if (poll2 == null) {
                                value2.pullMsgLock.unlock();
                            } else {
                                value2.setPullMsg(null);
                                poll2.setStatus("200");
                                poll2.setMsgIdRaw(pullMsg.getMsgId());
                                poll2.setMsgId(pullMsg.getMsgId());
                                value2.getSession().write(poll2);
                                value2.pullMsgLock.unlock();
                            }
                        }
                    } catch (IOException e) {
                        log.error(e.getMessage(), e);
                        value2.pullMsgLock.unlock();
                    }
                } catch (Throwable th) {
                    value2.pullMsgLock.unlock();
                    throw th;
                }
            }
        }
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public List<ConsumerInfo> getConsumerInfoList() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, PullSession>> it = this.sessMap.entrySet().iterator();
        while (it.hasNext()) {
            PullSession value = it.next().getValue();
            Session session = value.getSession();
            ConsumerInfo consumerInfo = new ConsumerInfo();
            consumerInfo.setStatus(session.getStatus().toString());
            consumerInfo.setRemoteAddr(session.getRemoteAddress());
            if (value.getTopics() != null) {
                consumerInfo.setTopics(new ArrayList(value.getTopics()));
            }
            arrayList.add(consumerInfo);
        }
        return arrayList;
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public int getMessageQueueSize() {
        return this.msgQ.size();
    }
}
