package org.apache.rocketmq.broker.failover;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;

/* loaded from: input_file:org/apache/rocketmq/broker/failover/EscapeBridge.class */
public class EscapeBridge {
    protected static final InternalLogger LOG = InternalLoggerFactory.getLogger("RocketmqBroker");
    private final String innerProducerGroupName;
    private final String innerConsumerGroupName;
    private final BrokerController brokerController;
    private DefaultMQProducer innerProducer;
    private DefaultMQPullConsumer innerConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.failover.EscapeBridge$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/failover/EscapeBridge$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$client$producer$SendStatus = new int[SendStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.SEND_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.SLAVE_NOT_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.FLUSH_DISK_TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$producer$SendStatus[SendStatus.FLUSH_SLAVE_TIMEOUT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public EscapeBridge(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.innerProducerGroupName = "InnerProducerGroup_" + brokerController.getBrokerConfig().getBrokerName() + "_" + brokerController.getBrokerConfig().getBrokerId();
        this.innerConsumerGroupName = "InnerConsumerGroup_" + brokerController.getBrokerConfig().getBrokerName() + "_" + brokerController.getBrokerConfig().getBrokerId();
    }

    public void start() throws Exception {
        if (this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() && this.brokerController.getBrokerConfig().isEnableRemoteEscape()) {
            String nameServerList = this.brokerController.getNameServerList();
            if (nameServerList == null || nameServerList.isEmpty()) {
                throw new RuntimeException("nameserver address is null or empty");
            }
            startInnerProducer(nameServerList);
            startInnerConsumer(nameServerList);
            LOG.info("start inner producer and consumer success.");
        }
    }

    public void shutdown() {
        if (this.innerProducer != null) {
            this.innerProducer.shutdown();
        }
        if (this.innerConsumer != null) {
            this.innerConsumer.shutdown();
        }
    }

    private void startInnerProducer(String str) throws MQClientException {
        try {
            this.innerProducer = new DefaultMQProducer(this.innerProducerGroupName);
            this.innerProducer.setNamesrvAddr(str);
            this.innerProducer.start();
        } catch (MQClientException e) {
            LOG.error("start inner producer failed, nameserver address: {}", str, e);
            throw e;
        }
    }

    private void startInnerConsumer(String str) throws MQClientException {
        try {
            this.innerConsumer = new DefaultMQPullConsumer(this.innerConsumerGroupName);
            this.innerConsumer.setNamesrvAddr(str);
            this.innerConsumer.start();
        } catch (MQClientException e) {
            LOG.error("start inner consumer failed, nameserver address: {}", str, e);
            throw e;
        }
    }

    public PutMessageResult putMessage(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().putMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape() || this.innerProducer == null) {
            LOG.warn("Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null);
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            return transformSendResult2PutResult(this.innerProducer.send(messageExtBrokerInner));
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        final CompletableFuture<PutMessageResult> completableFuture = new CompletableFuture<>();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().asyncPutMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape() || this.innerProducer == null) {
            LOG.warn("Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null));
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            this.innerProducer.send(messageExtBrokerInner, new SendCallback() { // from class: org.apache.rocketmq.broker.failover.EscapeBridge.1
                public void onSuccess(SendResult sendResult) {
                    completableFuture.complete(EscapeBridge.this.transformSendResult2PutResult(sendResult));
                }

                public void onException(Throwable th) {
                    completableFuture.complete(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true));
                }
            });
            return completableFuture;
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true));
        }
    }

    public PutMessageResult putMessageToSpecificQueue(MessageExtBrokerInner messageExtBrokerInner) {
        BrokerController peekMasterBroker = this.brokerController.peekMasterBroker();
        if (peekMasterBroker != null) {
            return peekMasterBroker.getMessageStore().putMessage(messageExtBrokerInner);
        }
        if (!this.brokerController.getBrokerConfig().isEnableSlaveActingMaster() || !this.brokerController.getBrokerConfig().isEnableRemoteEscape() || this.innerProducer == null) {
            LOG.warn("Put message to specific queue failed, enableSlaveActingMaster={}, enableRemoteEscape={}.", Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableSlaveActingMaster()), Boolean.valueOf(this.brokerController.getBrokerConfig().isEnableRemoteEscape()));
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, (AppendMessageResult) null);
        }
        try {
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            return transformSendResult2PutResult(this.innerProducer.send(messageExtBrokerInner, new MessageQueueSelector() { // from class: org.apache.rocketmq.broker.failover.EscapeBridge.2
                public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                    return list.get(Math.abs(((String) obj).hashCode()) % list.size());
                }
            }, messageExtBrokerInner.getTopic() + messageExtBrokerInner.getStoreHost()));
        } catch (Exception e) {
            LOG.error("sendMessageInFailover to remote failed", e);
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PutMessageResult transformSendResult2PutResult(SendResult sendResult) {
        if (sendResult == null) {
            return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
        switch (AnonymousClass3.$SwitchMap$org$apache$rocketmq$client$producer$SendStatus[sendResult.getSendStatus().ordinal()]) {
            case 1:
                return new PutMessageResult(PutMessageStatus.PUT_OK, (AppendMessageResult) null, true);
            case 2:
                return new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, (AppendMessageResult) null, true);
            case 3:
                return new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, (AppendMessageResult) null, true);
            case 4:
                return new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, (AppendMessageResult) null, true);
            default:
                return new PutMessageResult(PutMessageStatus.PUT_TO_REMOTE_BROKER_FAIL, (AppendMessageResult) null, true);
        }
    }

    public MessageExt getMessage(String str, long j, int i, String str2) {
        MessageStore messageStoreByBrokerName = this.brokerController.getMessageStoreByBrokerName(str2);
        if (messageStoreByBrokerName == null) {
            if (this.innerConsumer != null) {
                return getMessageFromRemote(str, j, i, str2);
            }
            return null;
        }
        GetMessageResult message = messageStoreByBrokerName.getMessage(this.innerConsumerGroupName, str, i, j, 1, (MessageFilter) null);
        List<MessageExt> decodeMsgList = decodeMsgList(message);
        if (decodeMsgList != null && !decodeMsgList.isEmpty()) {
            return decodeMsgList.get(0);
        }
        LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", new Object[]{str, Long.valueOf(j), Integer.valueOf(i), message});
        return null;
    }

    protected List<MessageExt> decodeMsgList(GetMessageResult getMessageResult) {
        ArrayList arrayList = new ArrayList();
        try {
            List messageBufferList = getMessageResult.getMessageBufferList();
            if (messageBufferList != null) {
                for (int i = 0; i < messageBufferList.size(); i++) {
                    ByteBuffer byteBuffer = (ByteBuffer) messageBufferList.get(i);
                    if (byteBuffer == null) {
                        LOG.error("bb is null {}", getMessageResult);
                    } else {
                        MessageExt decode = MessageDecoder.decode(byteBuffer);
                        if (decode == null) {
                            LOG.error("decode msgExt is null {}", getMessageResult);
                        } else {
                            decode.setQueueOffset(((Long) getMessageResult.getMessageQueueOffset().get(i)).longValue());
                            arrayList.add(decode);
                        }
                    }
                }
            }
            return arrayList;
        } finally {
            getMessageResult.release();
        }
    }

    protected MessageExt getMessageFromRemote(String str, long j, int i, String str2) {
        try {
            PullResult pull = this.innerConsumer.pull(new MessageQueue(str, str2, i), "*", j, 1);
            if (pull.getPullStatus().equals(PullStatus.FOUND)) {
                return (MessageExt) pull.getMsgFoundList().get(0);
            }
            return null;
        } catch (Exception e) {
            LOG.error("Get message from remote failed.", e);
            return null;
        }
    }
}
