package org.zstacks.zbus.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.Proto;
import org.zstacks.znet.Message;
import org.zstacks.znet.RemotingClient;
import org.zstacks.znet.callback.MessageCallback;

/* loaded from: input_file:org/zstacks/zbus/client/Consumer.class */
public class Consumer extends MqAdmin implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    private RemotingClient client;
    private String topic;
    private int messageCallbackThreadCount;
    protected ExecutorService executorService;
    private MessageCallback callback;

    public Consumer(Broker broker, String str, MessageMode... messageModeArr) {
        super(broker, str, messageModeArr);
        this.topic = null;
        this.messageCallbackThreadCount = 4;
        this.executorService = null;
    }

    public Consumer(MqConfig mqConfig) {
        super(mqConfig);
        this.topic = null;
        this.messageCallbackThreadCount = 4;
        this.executorService = null;
        this.topic = mqConfig.getTopic();
    }

    public Message recv(int i) throws IOException, InterruptedException {
        if (this.client == null) {
            this.client = this.broker.getClient(myClientHint());
        }
        Message message = new Message();
        message.setCommand(Proto.Consume);
        message.setMq(this.mq);
        message.setToken(this.accessToken);
        if (MessageMode.isEnabled(this.mode, MessageMode.PubSub) && this.topic != null) {
            message.setTopic(this.topic);
        }
        Message message2 = null;
        try {
            message2 = this.client.invokeSync(message, i);
            if (message2 != null && message2.isStatus404()) {
                if (createMQ()) {
                    return recv(i);
                }
                throw new IllegalStateException("register error");
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
            try {
                this.broker.closeClient(this.client);
                this.client = this.broker.getClient(myClientHint());
            } catch (IOException e2) {
                log.error(e.getMessage(), e);
            }
        }
        return message2;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.client != null) {
            this.broker.closeClient(this.client);
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    @Override // org.zstacks.zbus.client.MqAdmin
    protected Message invokeCreateMQ(Message message) throws IOException, InterruptedException {
        if (this.client == null) {
            this.client = this.broker.getClient(myClientHint());
        }
        return this.client.invokeSync(message, this.invokeTimeout);
    }

    public void reply(Message message) throws IOException {
        if (message.getStatus() != null) {
            message.setReplyCode(message.getStatus());
        }
        message.setCommand(Proto.Produce);
        message.setAck(false);
        this.client.getSession().write(message);
    }

    public void onMessage(MessageCallback messageCallback) throws IOException {
        this.callback = messageCallback;
        if (this.executorService == null) {
            this.executorService = new ThreadPoolExecutor(this.messageCallbackThreadCount, 256, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue());
            this.executorService.submit(new Runnable() { // from class: org.zstacks.zbus.client.Consumer.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            try {
                                Message recv = Consumer.this.recv(10000);
                                if (recv != null) {
                                    Consumer.this.callback.onMessage(recv, Consumer.this.client.getSession());
                                }
                            } catch (IOException e) {
                            }
                        } catch (InterruptedException e2) {
                            return;
                        }
                    }
                }
            });
        }
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        if (!MessageMode.isEnabled(this.mode, MessageMode.PubSub)) {
            throw new IllegalStateException("topic support for none-PubSub mode");
        }
        this.topic = str;
    }

    public int getMessageCallbackThreadCount() {
        return this.messageCallbackThreadCount;
    }

    public void setMessageCallbackThreadCount(int i) {
        this.messageCallbackThreadCount = i;
    }
}
