package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/ReplyMessageProcessor.class */
public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
    private static final InternalLogger log;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.processor.ReplyMessageProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/ReplyMessageProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$store$PutMessageStatus = new int[PutMessageStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.PUT_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_DISK_TIMEOUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_SLAVE_TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.SLAVE_NOT_AVAILABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.CREATE_MAPPED_FILE_FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.MESSAGE_ILLEGAL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.PROPERTIES_SIZE_EXCEEDED.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.SERVICE_NOT_AVAILABLE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.OS_PAGE_CACHE_BUSY.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$PutMessageStatus[PutMessageStatus.UNKNOWN_ERROR.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/ReplyMessageProcessor$PushReplyResult.class */
    public class PushReplyResult {
        boolean pushOk;
        String remark = "";

        public PushReplyResult(boolean z) {
            this.pushOk = z;
        }

        public boolean isPushOk() {
            return this.pushOk;
        }

        public void setPushOk(boolean z) {
            this.pushOk = z;
        }

        public String getRemark() {
            return this.remark;
        }

        public void setRemark(String str) {
            this.remark = str;
        }
    }

    public ReplyMessageProcessor(BrokerController brokerController) {
        super(brokerController);
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        SendMessageRequestHeader parseRequestHeader = parseRequestHeader(remotingCommand);
        if (parseRequestHeader == null) {
            return null;
        }
        SendMessageContext buildMsgContext = buildMsgContext(channelHandlerContext, parseRequestHeader);
        executeSendMessageHookBefore(channelHandlerContext, remotingCommand, buildMsgContext);
        RemotingCommand processReplyMessageRequest = processReplyMessageRequest(channelHandlerContext, remotingCommand, buildMsgContext, parseRequestHeader);
        executeSendMessageHookAfter(processReplyMessageRequest, buildMsgContext);
        return processReplyMessageRequest;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor
    public SendMessageRequestHeader parseRequestHeader(RemotingCommand remotingCommand) throws RemotingCommandException {
        SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
        SendMessageRequestHeader sendMessageRequestHeader = null;
        switch (remotingCommand.getCode()) {
            case 325:
                sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) remotingCommand.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
            case 324:
                if (null != sendMessageRequestHeaderV2) {
                    sendMessageRequestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
                    break;
                } else {
                    sendMessageRequestHeader = (SendMessageRequestHeader) remotingCommand.decodeCommandCustomHeader(SendMessageRequestHeader.class);
                    break;
                }
        }
        return sendMessageRequestHeader;
    }

    private RemotingCommand processReplyMessageRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageContext sendMessageContext, SendMessageRequestHeader sendMessageRequestHeader) {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) createResponseCommand.readCustomHeader();
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        createResponseCommand.addExtField("MSG_REGION", this.brokerController.getBrokerConfig().getRegionId());
        createResponseCommand.addExtField("TRACE_ON", String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
        log.debug("receive SendReplyMessage request command, {}", remotingCommand);
        long startAcceptSendRequestTimeStamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startAcceptSendRequestTimeStamp) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startAcceptSendRequestTimeStamp)));
            return createResponseCommand;
        }
        createResponseCommand.setCode(-1);
        super.msgCheck(channelHandlerContext, sendMessageRequestHeader, remotingCommand, createResponseCommand);
        if (createResponseCommand.getCode() != -1) {
            return createResponseCommand;
        }
        byte[] body = remotingCommand.getBody();
        int intValue = sendMessageRequestHeader.getQueueId().intValue();
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(sendMessageRequestHeader.getTopic());
        if (intValue < 0) {
            intValue = ThreadLocalRandom.current().nextInt(99999999) % selectTopicConfig.getWriteQueueNums();
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(sendMessageRequestHeader.getTopic());
        messageExtBrokerInner.setQueueId(intValue);
        messageExtBrokerInner.setBody(body);
        messageExtBrokerInner.setFlag(sendMessageRequestHeader.getFlag().intValue());
        MessageAccessor.setProperties(messageExtBrokerInner, MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
        messageExtBrokerInner.setPropertiesString(sendMessageRequestHeader.getProperties());
        messageExtBrokerInner.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp().longValue());
        messageExtBrokerInner.setBornHost(channelHandlerContext.channel().remoteAddress());
        messageExtBrokerInner.setStoreHost(getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue());
        handlePushReplyResult(pushReplyMessage(channelHandlerContext, sendMessageRequestHeader, messageExtBrokerInner), createResponseCommand, sendMessageResponseHeader, intValue);
        if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
            handlePutMessageResult(this.brokerController.getMessageStore().putMessage(messageExtBrokerInner), remotingCommand, messageExtBrokerInner, sendMessageResponseHeader, sendMessageContext, intValue);
        }
        return createResponseCommand;
    }

    private PushReplyResult pushReplyMessage(ChannelHandlerContext channelHandlerContext, SendMessageRequestHeader sendMessageRequestHeader, Message message) {
        ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        replyMessageRequestHeader.setBornHost(inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort());
        InetSocketAddress inetSocketAddress2 = (InetSocketAddress) getStoreHost();
        replyMessageRequestHeader.setStoreHost(inetSocketAddress2.getAddress().getHostAddress() + ":" + inetSocketAddress2.getPort());
        replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
        replyMessageRequestHeader.setProducerGroup(sendMessageRequestHeader.getProducerGroup());
        replyMessageRequestHeader.setTopic(sendMessageRequestHeader.getTopic());
        replyMessageRequestHeader.setDefaultTopic(sendMessageRequestHeader.getDefaultTopic());
        replyMessageRequestHeader.setDefaultTopicQueueNums(sendMessageRequestHeader.getDefaultTopicQueueNums());
        replyMessageRequestHeader.setQueueId(sendMessageRequestHeader.getQueueId());
        replyMessageRequestHeader.setSysFlag(sendMessageRequestHeader.getSysFlag());
        replyMessageRequestHeader.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
        replyMessageRequestHeader.setFlag(sendMessageRequestHeader.getFlag());
        replyMessageRequestHeader.setProperties(sendMessageRequestHeader.getProperties());
        replyMessageRequestHeader.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
        replyMessageRequestHeader.setUnitMode(sendMessageRequestHeader.isUnitMode());
        RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(326, replyMessageRequestHeader);
        createRequestCommand.setBody(message.getBody());
        String str = (String) message.getProperties().get("REPLY_TO_CLIENT");
        PushReplyResult pushReplyResult = new PushReplyResult(false);
        if (str != null) {
            Channel findChannel = this.brokerController.getProducerManager().findChannel(str);
            if (findChannel != null) {
                message.getProperties().put("PUSH_REPLY_TIME", String.valueOf(System.currentTimeMillis()));
                replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties()));
                try {
                    RemotingCommand callClient = this.brokerController.getBroker2Client().callClient(findChannel, createRequestCommand);
                    if (!$assertionsDisabled && callClient == null) {
                        throw new AssertionError();
                    }
                    switch (callClient.getCode()) {
                        case 0:
                            pushReplyResult.setPushOk(true);
                            break;
                        default:
                            pushReplyResult.setPushOk(false);
                            pushReplyResult.setRemark("push reply message to " + str + "fail.");
                            log.warn("push reply message to <{}> return fail, response remark: {}", str, callClient.getRemark());
                            break;
                    }
                } catch (RemotingException | InterruptedException e) {
                    pushReplyResult.setPushOk(false);
                    pushReplyResult.setRemark("push reply message to " + str + "fail.");
                    log.warn("push reply message to <{}> fail. {}", new Object[]{str, findChannel, e});
                }
            } else {
                pushReplyResult.setPushOk(false);
                pushReplyResult.setRemark("push reply message fail, channel of <" + str + "> not found.");
                log.warn(pushReplyResult.getRemark());
            }
        } else {
            log.warn("REPLY_TO_CLIENT is null, can not reply message");
            pushReplyResult.setPushOk(false);
            pushReplyResult.setRemark("reply message properties[REPLY_TO_CLIENT] is null");
        }
        return pushReplyResult;
    }

    private void handlePushReplyResult(PushReplyResult pushReplyResult, RemotingCommand remotingCommand, SendMessageResponseHeader sendMessageResponseHeader, int i) {
        if (!pushReplyResult.isPushOk()) {
            remotingCommand.setCode(1);
            remotingCommand.setRemark(pushReplyResult.getRemark());
            return;
        }
        remotingCommand.setCode(0);
        remotingCommand.setRemark((String) null);
        sendMessageResponseHeader.setMsgId("0");
        sendMessageResponseHeader.setQueueId(Integer.valueOf(i));
        sendMessageResponseHeader.setQueueOffset(0L);
    }

    private void handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand remotingCommand, MessageExt messageExt, SendMessageResponseHeader sendMessageResponseHeader, SendMessageContext sendMessageContext, int i) {
        if (putMessageResult == null) {
            log.warn("process reply message, store putMessage return null");
            return;
        }
        boolean z = false;
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$store$PutMessageStatus[putMessageResult.getPutMessageStatus().ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
                z = true;
                break;
            case 5:
                log.warn("create mapped file failed, server is busy or broken.");
                break;
            case 6:
                log.warn("the message is illegal, maybe msg body or properties length not matched. msg body length limit {}B.", Integer.valueOf(this.brokerController.getMessageStoreConfig().getMaxMessageSize()));
                break;
            case 7:
                log.warn("the message is illegal, maybe msg properties length limit 32KB.");
                break;
            case 8:
                log.warn("service not available now. It may be caused by one of the following reasons: the broker's disk is full, messages are put to the slave, message store has been shut down, etc.");
                break;
            case 9:
                log.warn("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            case 10:
                log.warn("UNKNOWN_ERROR");
                break;
            default:
                log.warn("UNKNOWN_ERROR DEFAULT");
                break;
        }
        String str = (String) remotingCommand.getExtFields().get("Owner");
        int commercialSizePerMsg = this.brokerController.getBrokerConfig().getCommercialSizePerMsg();
        if (!z) {
            if (hasSendMessageHook()) {
                int length = remotingCommand.getBody().length;
                int ceil = (int) Math.ceil(length / commercialSizePerMsg);
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendTimes(ceil);
                sendMessageContext.setCommercialSendSize(length);
                sendMessageContext.setCommercialOwner(str);
                return;
            }
            return;
        }
        this.brokerController.getBrokerStatsManager().incTopicPutNums(messageExt.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutSize(messageExt.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
        sendMessageResponseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
        sendMessageResponseHeader.setQueueId(Integer.valueOf(i));
        sendMessageResponseHeader.setQueueOffset(Long.valueOf(putMessageResult.getAppendMessageResult().getLogicsOffset()));
        if (hasSendMessageHook()) {
            sendMessageContext.setMsgId(sendMessageResponseHeader.getMsgId());
            sendMessageContext.setQueueId(sendMessageResponseHeader.getQueueId());
            sendMessageContext.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
            int commercialBaseCount = this.brokerController.getBrokerConfig().getCommercialBaseCount();
            int wroteBytes = putMessageResult.getAppendMessageResult().getWroteBytes();
            int ceil2 = ((int) Math.ceil(wroteBytes / commercialSizePerMsg)) * commercialBaseCount;
            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
            sendMessageContext.setCommercialSendTimes(ceil2);
            sendMessageContext.setCommercialSendSize(wroteBytes);
            sendMessageContext.setCommercialOwner(str);
        }
    }

    static {
        $assertionsDisabled = !ReplyMessageProcessor.class.desiredAssertionStatus();
        log = InternalLoggerFactory.getLogger("RocketmqBroker");
    }
}
