package org.zstacks.zbus.server.mq;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.ConsumerInfo;
import org.zstacks.zbus.server.ServerHelper;
import org.zstacks.znet.Message;
import org.zstacks.znet.nio.Session;

/* loaded from: input_file:org/zstacks/zbus/server/mq/RequestQueue.class */
public class RequestQueue extends MessageQueue {
    private static final long serialVersionUID = -7640938066598234399L;
    private static final Logger log = LoggerFactory.getLogger(RequestQueue.class);
    protected final BlockingQueue<Message> msgQ;
    transient BlockingQueue<PullSession> sessQ;

    public RequestQueue(String str, String str2, ExecutorService executorService, int i) {
        super(str, str2, executorService, i);
        this.msgQ = new LinkedBlockingQueue();
        this.sessQ = new LinkedBlockingQueue();
    }

    void enqueue(final Message message) {
        this.msgQ.offer(message);
        this.executor.submit(new Runnable() { // from class: org.zstacks.zbus.server.mq.RequestQueue.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (RequestQueue.this.messageStore != null) {
                        RequestQueue.this.messageStore.saveMessage(message);
                    }
                } catch (Exception e) {
                    RequestQueue.log.error(e.getMessage(), e);
                }
            }
        });
    }

    Message dequeue() {
        final Message poll = this.msgQ.poll();
        if (poll != null) {
            this.executor.submit(new Runnable() { // from class: org.zstacks.zbus.server.mq.RequestQueue.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (RequestQueue.this.messageStore != null) {
                            RequestQueue.this.messageStore.removeMessage(poll);
                        }
                    } catch (Exception e) {
                        RequestQueue.log.error(e.getMessage(), e);
                    }
                }
            });
        }
        return poll;
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void produce(Message message, Session session) throws IOException {
        String msgId = message.getMsgId();
        if (message.isAck()) {
            ServerHelper.reply200(msgId, session);
        }
        enqueue(message);
        dispatch();
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void consume(Message message, Session session) throws IOException {
        for (PullSession pullSession : this.sessQ) {
            if (pullSession.getSession() == session) {
                pullSession.setPullMsg(message);
                dispatch();
                return;
            }
        }
        this.sessQ.offer(new PullSession(session, message));
        dispatch();
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public void cleanSession() {
        Iterator it = this.sessQ.iterator();
        while (it.hasNext()) {
            if (!((PullSession) it.next()).session.isActive()) {
                it.remove();
            }
        }
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    void doDispatch() throws IOException {
        Message dequeue;
        while (this.msgQ.peek() != null && this.sessQ.peek() != null) {
            PullSession poll = this.sessQ.poll();
            if (poll != null && poll.window.get() != 0 && poll.getSession().isActive() && (dequeue = dequeue()) != null) {
                try {
                    Message pullMsg = poll.getPullMsg();
                    Message copyWithoutBody = Message.copyWithoutBody(dequeue);
                    prepareMessageStatus(copyWithoutBody);
                    copyWithoutBody.setMsgIdRaw(dequeue.getMsgId());
                    copyWithoutBody.setMsgId(pullMsg.getMsgId());
                    poll.getSession().write(copyWithoutBody);
                    if (poll.window.get() > 0) {
                        poll.window.decrementAndGet();
                    }
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                    enqueue(dequeue);
                }
                if (poll.window.get() == -1 || poll.window.get() > 0) {
                    this.sessQ.offer(poll);
                }
            }
        }
    }

    public void loadMessageList(List<Message> list) {
        this.msgQ.clear();
        this.msgQ.addAll(list);
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public List<ConsumerInfo> getConsumerInfoList() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.sessQ.iterator();
        while (it.hasNext()) {
            Session session = ((PullSession) it.next()).getSession();
            ConsumerInfo consumerInfo = new ConsumerInfo();
            consumerInfo.setStatus(session.getStatus().toString());
            consumerInfo.setRemoteAddr(session.getRemoteAddress());
            arrayList.add(consumerInfo);
        }
        return arrayList;
    }

    @Override // org.zstacks.zbus.server.mq.MessageQueue
    public int getMessageQueueSize() {
        return this.msgQ.size();
    }
}
