package org.zstacks.zbus.client.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zstacks.zbus.client.Broker;
import org.zstacks.zbus.client.ClientHint;
import org.zstacks.zbus.client.ZbusException;
import org.zstacks.zbus.protocol.MessageMode;
import org.zstacks.zbus.protocol.MqInfo;
import org.zstacks.zbus.protocol.Proto;
import org.zstacks.zbus.protocol.TrackTable;
import org.zstacks.znet.Helper;
import org.zstacks.znet.Message;
import org.zstacks.znet.RemotingClient;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.ticket.ResultCallback;

/* loaded from: input_file:org/zstacks/zbus/client/broker/HaBroker.class */
public class HaBroker implements Broker, TrackListener {
    private static final Logger log = LoggerFactory.getLogger(HaBroker.class);
    private String trackAddressList;
    private HaBrokerConfig config;
    public TrackAgent trackAgent;
    private Dispatcher dispatcher;
    private boolean ownDispatcher;
    private final String requestIp = Helper.getLocalIp();
    private volatile TrackTable trackTable = new TrackTable();
    private Map<String, SingleBroker> brokers = new ConcurrentHashMap();

    public HaBroker(HaBrokerConfig haBrokerConfig) throws IOException {
        this.dispatcher = null;
        this.ownDispatcher = false;
        this.config = haBrokerConfig;
        this.trackAddressList = haBrokerConfig.getTrackAddrList();
        if (haBrokerConfig.getDispatcher() == null) {
            this.ownDispatcher = true;
            this.dispatcher = new Dispatcher();
            this.config.setDispatcher(this.dispatcher);
        } else {
            this.dispatcher = haBrokerConfig.getDispatcher();
            this.ownDispatcher = false;
        }
        this.dispatcher.start();
        this.trackAgent = new TrackAgent(this.trackAddressList, this.dispatcher);
        this.trackAgent.addTrackListener(this);
        this.trackAgent.waitForReady(3000L);
    }

    private SingleBroker getBrokerByAddress(String str) {
        return this.brokers.get(str);
    }

    @Override // org.zstacks.zbus.client.broker.TrackListener
    public void onTrackTableUpdated(TrackTable trackTable) {
        this.trackTable = trackTable;
        for (String str : trackTable.brokerAddresses()) {
            if (this.brokers.get(str) == null) {
                HaBrokerConfig mo5clone = this.config.mo5clone();
                mo5clone.setBrokerAddress(str);
                try {
                    this.brokers.put(str, new SingleBroker(mo5clone));
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                }
            }
        }
        Iterator<Map.Entry<String, SingleBroker>> it = this.brokers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, SingleBroker> next = it.next();
            String key = next.getKey();
            SingleBroker value = next.getValue();
            if (!this.trackTable.brokerAddresses().contains(key)) {
                try {
                    value.close();
                } catch (IOException e2) {
                    log.error(e2.getMessage(), e2);
                }
                it.remove();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<SingleBroker> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.ownDispatcher && this.dispatcher != null) {
            try {
                this.dispatcher.close();
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
        try {
            this.trackAgent.close();
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
        }
    }

    private boolean isAdmin(Message message) {
        return Proto.Admin.equals(message.getCommand());
    }

    private void invokeAsyncByBroker(String str, Message message, ResultCallback resultCallback) throws IOException {
        SingleBroker brokerByAddress = getBrokerByAddress(str);
        if (brokerByAddress != null) {
            brokerByAddress.invokeAsync(message, resultCallback);
        } else {
            String str2 = str + " zbus broker missing";
            log.error(str2);
            throw new ZbusException(str2);
        }
    }

    private Message invokeSyncByBroker(String str, Message message, int i) throws IOException {
        SingleBroker brokerByAddress = getBrokerByAddress(str);
        if (brokerByAddress != null) {
            return brokerByAddress.invokeSync(message, i);
        }
        String str2 = str + " zbus broker missing";
        log.error(str2);
        throw new ZbusException(str2);
    }

    private String getBrokerAddressForAdmin(String str) {
        String next;
        List<MqInfo> list = null;
        if (str != null) {
            list = this.trackTable.getMqInfo(str);
        }
        if (list != null && list.size() != 0) {
            next = list.get(0).getBroker();
        } else {
            if (this.trackTable.brokerAddresses().size() == 0) {
                throw new ZbusException("ZbusServer not exists");
            }
            next = this.trackTable.brokerAddresses().iterator().next();
        }
        return next;
    }

    @Override // org.zstacks.zbus.client.Broker
    public void invokeAsync(Message message, ResultCallback resultCallback) throws IOException {
        if (isAdmin(message)) {
            invokeAsyncByBroker(getBrokerAddressForAdmin(message.getMq()), message, resultCallback);
            return;
        }
        String broker = message.getBroker();
        if (broker != null) {
            invokeAsyncByBroker(broker, message, resultCallback);
            return;
        }
        List<MqInfo> mqInfo = this.trackTable.getMqInfo(message.getMq());
        if (mqInfo == null || mqInfo.size() == 0) {
            throw new ZbusException("no broker available");
        }
        if (MessageMode.isEnabled(mqInfo.get(0).getMode(), MessageMode.PubSub)) {
            Iterator<MqInfo> it = mqInfo.iterator();
            while (it.hasNext()) {
                invokeAsyncByBroker(it.next().getBroker(), message, resultCallback);
            }
        }
        invokeAsyncByBroker(mqInfo.get(0).getBroker(), message, resultCallback);
    }

    @Override // org.zstacks.zbus.client.Broker
    public Message invokeSync(Message message, int i) throws IOException {
        if (isAdmin(message)) {
            return invokeSyncByBroker(getBrokerAddressForAdmin(message.getMq()), message, i);
        }
        String broker = message.getBroker();
        if (broker != null) {
            return invokeSyncByBroker(broker, message, i);
        }
        List<MqInfo> mqInfo = this.trackTable.getMqInfo(message.getMq());
        if (mqInfo == null || mqInfo.size() == 0) {
            throw new ZbusException("no broker available");
        }
        if (!MessageMode.isEnabled(mqInfo.get(0).getMode(), MessageMode.PubSub)) {
            return invokeSyncByBroker(mqInfo.get(0).getBroker(), message, i);
        }
        Message message2 = null;
        Iterator<MqInfo> it = mqInfo.iterator();
        while (it.hasNext()) {
            message2 = invokeSyncByBroker(it.next().getBroker(), message, i);
        }
        return message2;
    }

    private RemotingClient getClientByBroker(String str) throws IOException {
        SingleBroker brokerByAddress = getBrokerByAddress(str);
        if (brokerByAddress == null) {
            String str2 = str + " zbus broker missing";
            log.error(str2);
            throw new IOException(str2);
        }
        ClientHint clientHint = new ClientHint();
        clientHint.setBroker(str);
        return brokerByAddress.getClient(clientHint);
    }

    @Override // org.zstacks.zbus.client.Broker
    public RemotingClient getClient(ClientHint clientHint) throws IOException {
        List<MqInfo> mqInfo;
        String broker = clientHint.getBroker();
        if (broker != null) {
            return getClientByBroker(broker);
        }
        if (clientHint.getMq() != null && (mqInfo = this.trackTable.getMqInfo(clientHint.getMq())) != null && mqInfo.size() > 0) {
            MqInfo mqInfo2 = mqInfo.get(mqInfo.size() - 1);
            if (mqInfo2.getUnconsumedMsgCount() > 0) {
                return getClientByBroker(mqInfo2.getBroker());
            }
        }
        ArrayList arrayList = new ArrayList(this.trackTable.brokerAddresses());
        if (arrayList.size() == 0) {
            throw new IOException("no broker available");
        }
        String requestIp = clientHint.getRequestIp();
        if (requestIp == null) {
            requestIp = this.requestIp;
        }
        return getClientByBroker((String) arrayList.get(Math.abs(requestIp.hashCode()) % arrayList.size()));
    }

    @Override // org.zstacks.zbus.client.Broker
    public void closeClient(RemotingClient remotingClient) throws IOException {
        SingleBroker brokerByAddress = getBrokerByAddress(remotingClient.getBrokerAddress());
        if (brokerByAddress != null) {
            brokerByAddress.closeClient(remotingClient);
        } else {
            log.warn("unable to find client's broker");
            remotingClient.close();
        }
    }
}
