package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

/* loaded from: input_file:org/apache/rocketmq/broker/processor/PullMessageProcessor.class */
public class PullMessageProcessor implements NettyRequestProcessor {
    private static final InternalLogger LOGGER;
    private List<ConsumeMessageHook> consumeMessageHookList;
    private PullMessageResultHandler pullMessageResultHandler;
    private final BrokerController brokerController;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.broker.processor.PullMessageProcessor$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/broker/processor/PullMessageProcessor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$store$GetMessageStatus = new int[GetMessageStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.MESSAGE_WAS_REMOVING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MESSAGE_IN_QUEUE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_MESSAGE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_FOUND_NULL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_BADLY.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_ONE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_TOO_SMALL.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    public PullMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.pullMessageResultHandler = new DefaultPullMessageResultHandler(brokerController);
    }

    private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader pullMessageRequestHeader, TopicQueueMappingContext topicQueueMappingContext) {
        try {
            if (topicQueueMappingContext.getMappingDetail() == null) {
                return null;
            }
            TopicQueueMappingDetail mappingDetail = topicQueueMappingContext.getMappingDetail();
            String topic = topicQueueMappingContext.getTopic();
            Integer globalId = topicQueueMappingContext.getGlobalId();
            if (!topicQueueMappingContext.isLeader()) {
                return RemotingCommand.buildErrorResponse(501, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
            }
            Long queueOffset = pullMessageRequestHeader.getQueueOffset();
            LogicQueueMappingItem findLogicQueueMappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(topicQueueMappingContext.getMappingItemList(), queueOffset.longValue(), true);
            topicQueueMappingContext.setCurrentItem(findLogicQueueMappingItem);
            if (queueOffset.longValue() < findLogicQueueMappingItem.getLogicOffset()) {
            }
            String bname = findLogicQueueMappingItem.getBname();
            Integer valueOf = Integer.valueOf(findLogicQueueMappingItem.getQueueId());
            Long valueOf2 = Long.valueOf(findLogicQueueMappingItem.computePhysicalQueueOffset(queueOffset.longValue()));
            pullMessageRequestHeader.setQueueId(valueOf);
            pullMessageRequestHeader.setQueueOffset(valueOf2);
            if (findLogicQueueMappingItem.checkIfEndOffsetDecided() && pullMessageRequestHeader.getMaxMsgNums() != null) {
                pullMessageRequestHeader.setMaxMsgNums(Integer.valueOf((int) Math.min(findLogicQueueMappingItem.getEndOffset() - findLogicQueueMappingItem.getStartOffset(), pullMessageRequestHeader.getMaxMsgNums().intValue())));
            }
            if (mappingDetail.getBname().equals(bname)) {
                return null;
            }
            int intValue = pullMessageRequestHeader.getSysFlag().intValue();
            pullMessageRequestHeader.setLo(false);
            pullMessageRequestHeader.setBname(bname);
            pullMessageRequestHeader.setSysFlag(Integer.valueOf(PullSysFlag.clearCommitOffsetFlag(PullSysFlag.clearSuspendFlag(intValue))));
            RpcResponse rpcResponse = (RpcResponse) this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(new RpcRequest(11, pullMessageRequestHeader, (Object) null), this.brokerController.getBrokerConfig().getForwardTimeout()).get();
            if (rpcResponse.getException() != null) {
                throw rpcResponse.getException();
            }
            RemotingCommand rewriteResponseForStaticTopic = rewriteResponseForStaticTopic(pullMessageRequestHeader, (PullMessageResponseHeader) rpcResponse.getHeader(), topicQueueMappingContext, rpcResponse.getCode());
            return rewriteResponseForStaticTopic != null ? rewriteResponseForStaticTopic : RpcClientUtils.createCommandForRpcResponse(rpcResponse);
        } catch (Throwable th) {
            return RemotingCommand.buildErrorResponse(1, th.getMessage());
        }
    }

    private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader pullMessageRequestHeader, PullMessageResponseHeader pullMessageResponseHeader, TopicQueueMappingContext topicQueueMappingContext, int i) {
        try {
            if (topicQueueMappingContext.getMappingDetail() == null) {
                return null;
            }
            TopicQueueMappingDetail mappingDetail = topicQueueMappingContext.getMappingDetail();
            LogicQueueMappingItem leaderItem = topicQueueMappingContext.getLeaderItem();
            LogicQueueMappingItem currentItem = topicQueueMappingContext.getCurrentItem();
            LogicQueueMappingItem findLogicQueueMappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(topicQueueMappingContext.getMappingItemList(), 0L, true);
            if (!$assertionsDisabled && currentItem.getLogicOffset() < 0) {
                throw new AssertionError();
            }
            long longValue = pullMessageRequestHeader.getQueueOffset().longValue();
            long longValue2 = pullMessageResponseHeader.getNextBeginOffset().longValue();
            long longValue3 = pullMessageResponseHeader.getMinOffset().longValue();
            long longValue4 = pullMessageResponseHeader.getMaxOffset().longValue();
            int i2 = i;
            if (i != 0) {
                boolean z = false;
                if (leaderItem.getGen() == currentItem.getGen()) {
                    if (longValue > longValue4) {
                        if (i == 21) {
                            i2 = 21;
                            longValue2 = longValue4;
                        } else {
                            i2 = i;
                        }
                    } else if (longValue < longValue3) {
                        longValue2 = longValue3;
                        i2 = 20;
                    } else {
                        i2 = i;
                    }
                }
                if (findLogicQueueMappingItem.getGen() == currentItem.getGen()) {
                    if (longValue < longValue3) {
                        if (i == 21) {
                            i2 = 21;
                            longValue2 = longValue3;
                        } else {
                            i2 = 21;
                            longValue2 = longValue3;
                        }
                    } else if (longValue >= longValue4) {
                        LogicQueueMappingItem findNext = TopicQueueMappingUtils.findNext(topicQueueMappingContext.getMappingItemList(), currentItem, true);
                        if (findNext != null) {
                            z = true;
                            currentItem = findNext;
                            longValue2 = currentItem.getStartOffset();
                            longValue3 = currentItem.getStartOffset();
                            longValue4 = longValue3;
                            i2 = 20;
                        } else {
                            i2 = 19;
                        }
                    } else {
                        i2 = i;
                    }
                }
                if (!z && leaderItem.getGen() != currentItem.getGen() && findLogicQueueMappingItem.getGen() != currentItem.getGen()) {
                    if (longValue < longValue3) {
                        longValue2 = longValue3;
                        i2 = 20;
                    } else if (longValue >= longValue4) {
                        LogicQueueMappingItem findNext2 = TopicQueueMappingUtils.findNext(topicQueueMappingContext.getMappingItemList(), currentItem, true);
                        if (findNext2 != null) {
                            currentItem = findNext2;
                            longValue2 = currentItem.getStartOffset();
                            longValue3 = currentItem.getStartOffset();
                            longValue4 = longValue3;
                            i2 = 20;
                        } else {
                            i2 = 19;
                        }
                    } else {
                        i2 = i;
                    }
                }
            }
            if (currentItem.checkIfEndOffsetDecided() && longValue2 >= currentItem.getEndOffset()) {
                longValue2 = currentItem.getEndOffset();
            }
            pullMessageResponseHeader.setNextBeginOffset(Long.valueOf(currentItem.computeStaticQueueOffsetStrictly(longValue2)));
            pullMessageResponseHeader.setMinOffset(Long.valueOf(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), longValue3))));
            pullMessageResponseHeader.setMaxOffset(Long.valueOf(Math.max(currentItem.computeStaticQueueOffsetStrictly(longValue4), TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, topicQueueMappingContext.getGlobalId()))));
            pullMessageResponseHeader.setOffsetDelta(Long.valueOf(currentItem.computeOffsetDelta()));
            if (i != 0) {
                return RemotingCommand.createResponseCommandWithHeader(i2, pullMessageResponseHeader);
            }
            return null;
        } catch (Throwable th) {
            return RemotingCommand.buildErrorResponse(1, th.getMessage());
        }
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        return processRequest(channelHandlerContext.channel(), remotingCommand, true);
    }

    public boolean rejectRequest() {
        return !this.brokerController.getBrokerConfig().isSlaveReadEnable() && this.brokerController.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand, boolean z) throws RemotingCommandException {
        SubscriptionData build;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        PullMessageResponseHeader pullMessageResponseHeader = (PullMessageResponseHeader) createResponseCommand.readCustomHeader();
        PullMessageRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PullMessageRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        LOGGER.debug("receive PullMessage request command, {}", remotingCommand);
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
            return createResponseCommand;
        }
        if (remotingCommand.getCode() == 361 && !this.brokerController.getBrokerConfig().isLitePullMessageEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] for lite pull consumer is forbidden");
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark(String.format("subscription group [%s] does not exist, %s", decodeCommandCustomHeader.getConsumerGroup(), FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/")));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            pullMessageResponseHeader.setForbiddenType(2);
            createResponseCommand.setRemark("subscription group no permission, " + decodeCommandCustomHeader.getConsumerGroup());
            return createResponseCommand;
        }
        boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(decodeCommandCustomHeader.getTopic());
        if (null == selectTopicConfig) {
            LOGGER.error("the topic {} not exist, consumer: {}", decodeCommandCustomHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark(String.format("topic[%s] not exist, apply first please! %s", decodeCommandCustomHeader.getTopic(), FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/")));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig.getPerm())) {
            createResponseCommand.setCode(16);
            pullMessageResponseHeader.setForbiddenType(3);
            createResponseCommand.setRemark("the topic[" + decodeCommandCustomHeader.getTopic() + "] pulling message is forbidden");
            return createResponseCommand;
        }
        TopicQueueMappingContext buildTopicQueueMappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(decodeCommandCustomHeader, false);
        RemotingCommand rewriteRequestForStaticTopic = rewriteRequestForStaticTopic(decodeCommandCustomHeader, buildTopicQueueMappingContext);
        if (rewriteRequestForStaticTopic != null) {
            return rewriteRequestForStaticTopic;
        }
        if (decodeCommandCustomHeader.getQueueId().intValue() < 0 || decodeCommandCustomHeader.getQueueId().intValue() >= selectTopicConfig.getReadQueueNums()) {
            String format = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getTopic(), Integer.valueOf(selectTopicConfig.getReadQueueNums()), channel.remoteAddress());
            LOGGER.warn(format);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(format);
            return createResponseCommand;
        }
        ConsumerFilterData consumerFilterData = null;
        if (hasSubscriptionFlag) {
            try {
                build = FilterAPI.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getExpressionType());
                if (!ExpressionType.isTagType(build.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getExpressionType(), decodeCommandCustomHeader.getSubVersion().longValue());
                    if (!$assertionsDisabled && consumerFilterData == null) {
                        throw new AssertionError();
                    }
                }
            } catch (Exception e) {
                LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}", decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        } else {
            ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(decodeCommandCustomHeader.getConsumerGroup());
            if (null == consumerGroupInfo) {
                LOGGER.warn("the consumer's group info not exist, group: {}", decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"));
                return createResponseCommand;
            }
            if (!findSubscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                createResponseCommand.setCode(16);
                pullMessageResponseHeader.setForbiddenType(4);
                createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] can not consume by broadcast way");
                return createResponseCommand;
            }
            if (this.brokerController.getSubscriptionGroupManager().getForbidden(findSubscriptionGroupConfig.getGroupName(), decodeCommandCustomHeader.getTopic(), 2)) {
                createResponseCommand.setCode(16);
                pullMessageResponseHeader.setForbiddenType(5);
                createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] is forbidden for topic[" + decodeCommandCustomHeader.getTopic() + "]");
                return createResponseCommand;
            }
            build = consumerGroupInfo.findSubscriptionData(decodeCommandCustomHeader.getTopic());
            if (null == build) {
                LOGGER.warn("the consumer's subscription not exist, group: {}, topic:{}", decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"));
                return createResponseCommand;
            }
            if (build.getSubVersion() < decodeCommandCustomHeader.getSubVersion().longValue()) {
                LOGGER.warn("The broker's subscription is not latest, group: {} {}", decodeCommandCustomHeader.getConsumerGroup(), build.getSubString());
                createResponseCommand.setCode(25);
                createResponseCommand.setRemark("the consumer's subscription not latest");
                return createResponseCommand;
            }
            if (!ExpressionType.isTagType(build.getExpressionType())) {
                consumerFilterData = this.brokerController.getConsumerFilterManager().get(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup());
                if (consumerFilterData == null) {
                    createResponseCommand.setCode(27);
                    createResponseCommand.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                    return createResponseCommand;
                }
                if (consumerFilterData.getClientVersion() < decodeCommandCustomHeader.getSubVersion().longValue()) {
                    LOGGER.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}", new Object[]{decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), Long.valueOf(consumerFilterData.getClientVersion()), decodeCommandCustomHeader.getSubVersion()});
                    createResponseCommand.setCode(28);
                    createResponseCommand.setRemark("the consumer's consumer filter data not latest");
                    return createResponseCommand;
                }
            }
        }
        if (!ExpressionType.isTagType(build.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("The broker does not support consumer to filter message by " + build.getExpressionType());
            return createResponseCommand;
        }
        ExpressionMessageFilter expressionForRetryMessageFilter = this.brokerController.getBrokerConfig().isFilterSupportRetry() ? new ExpressionForRetryMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager()) : new ExpressionMessageFilter(build, consumerFilterData, this.brokerController.getConsumerFilterManager());
        GetMessageResult message = this.brokerController.getMessageStore().getMessage(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getQueueOffset().longValue(), decodeCommandCustomHeader.getMaxMsgNums().intValue(), expressionForRetryMessageFilter);
        if (message != null) {
            createResponseCommand.setRemark(message.getStatus().name());
            pullMessageResponseHeader.setNextBeginOffset(Long.valueOf(message.getNextBeginOffset()));
            pullMessageResponseHeader.setMinOffset(Long.valueOf(message.getMinOffset()));
            pullMessageResponseHeader.setMaxOffset(Long.valueOf(message.getMaxOffset()));
            pullMessageResponseHeader.setTopicSysFlag(Integer.valueOf(selectTopicConfig.getTopicSysFlag()));
            pullMessageResponseHeader.setGroupSysFlag(Integer.valueOf(findSubscriptionGroupConfig.getGroupSysFlag()));
            switch (AnonymousClass2.$SwitchMap$org$apache$rocketmq$store$GetMessageStatus[message.getStatus().ordinal()]) {
                case 1:
                    createResponseCommand.setCode(0);
                    break;
                case 2:
                    createResponseCommand.setCode(20);
                    break;
                case 3:
                case 4:
                    if (0 == decodeCommandCustomHeader.getQueueOffset().longValue()) {
                        createResponseCommand.setCode(19);
                        break;
                    } else {
                        createResponseCommand.setCode(21);
                        LOGGER.info("the broker stores no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", new Object[]{decodeCommandCustomHeader.getQueueOffset(), Long.valueOf(message.getNextBeginOffset()), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getConsumerGroup()});
                        break;
                    }
                case 5:
                    createResponseCommand.setCode(20);
                    break;
                case 6:
                    createResponseCommand.setCode(19);
                    break;
                case 7:
                    createResponseCommand.setCode(21);
                    LOGGER.info("the request offset: {} over flow badly, fix to {}, broker max offset: {}, consumer: {}", new Object[]{decodeCommandCustomHeader.getQueueOffset(), Long.valueOf(message.getNextBeginOffset()), Long.valueOf(message.getMaxOffset()), channel.remoteAddress()});
                    break;
                case 8:
                    createResponseCommand.setCode(19);
                    break;
                case 9:
                    createResponseCommand.setCode(21);
                    LOGGER.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", new Object[]{decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueOffset(), Long.valueOf(message.getMinOffset()), channel.remoteAddress()});
                    break;
                default:
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
            if (!this.brokerController.getBrokerConfig().isSlaveReadEnable() || this.brokerController.getBrokerConfig().isInBrokerContainer()) {
                pullMessageResponseHeader.setSuggestWhichBrokerId(0L);
            } else if (message.isSuggestPullingFromSlave()) {
                pullMessageResponseHeader.setSuggestWhichBrokerId(Long.valueOf(findSubscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
            } else {
                pullMessageResponseHeader.setSuggestWhichBrokerId(Long.valueOf(findSubscriptionGroupConfig.getBrokerId()));
            }
            if (this.brokerController.getBrokerConfig().getBrokerId() != 0 && !message.isSuggestPullingFromSlave() && this.brokerController.getMinBrokerIdInGroup() == 0) {
                LOGGER.debug("slave redirect pullRequest to master, topic: {}, queueId: {}, consumer group: {}, next: {}, min: {}, max: {}", new Object[]{decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getConsumerGroup(), pullMessageResponseHeader.getNextBeginOffset(), pullMessageResponseHeader.getMinOffset(), pullMessageResponseHeader.getMaxOffset()});
                pullMessageResponseHeader.setSuggestWhichBrokerId(0L);
                if (!message.getStatus().equals(GetMessageStatus.FOUND)) {
                    createResponseCommand.setCode(20);
                }
            }
            if (hasConsumeMessageHook()) {
                String str = (String) remotingCommand.getExtFields().get("Owner");
                String str2 = (String) remotingCommand.getExtFields().get("AUTH_TYPE");
                String str3 = (String) remotingCommand.getExtFields().get("OWNER_PARENT");
                String str4 = (String) remotingCommand.getExtFields().get("OWNER_SELF");
                ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                consumeMessageContext.setConsumerGroup(decodeCommandCustomHeader.getConsumerGroup());
                consumeMessageContext.setTopic(decodeCommandCustomHeader.getTopic());
                consumeMessageContext.setQueueId(decodeCommandCustomHeader.getQueueId());
                consumeMessageContext.setAccountAuthType(str2);
                consumeMessageContext.setAccountOwnerParent(str3);
                consumeMessageContext.setAccountOwnerSelf(str4);
                consumeMessageContext.setNamespace(NamespaceUtil.getNamespaceFromResource(decodeCommandCustomHeader.getTopic()));
                switch (createResponseCommand.getCode()) {
                    case 0:
                        int msgCount4Commercial = message.getMsgCount4Commercial() * this.brokerController.getBrokerConfig().getCommercialBaseCount();
                        consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
                        consumeMessageContext.setCommercialRcvTimes(msgCount4Commercial);
                        consumeMessageContext.setCommercialRcvSize(message.getBufferTotalSize());
                        consumeMessageContext.setCommercialOwner(str);
                        consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_SUCCESS);
                        consumeMessageContext.setRcvMsgNum(message.getMessageCount());
                        consumeMessageContext.setRcvMsgSize(message.getBufferTotalSize());
                        consumeMessageContext.setCommercialRcvMsgNum(message.getMsgCount4Commercial());
                        break;
                    case 19:
                        if (!z) {
                            consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            consumeMessageContext.setCommercialRcvTimes(1);
                            consumeMessageContext.setCommercialOwner(str);
                            consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                            consumeMessageContext.setRcvMsgNum(0);
                            consumeMessageContext.setRcvMsgSize(0);
                            consumeMessageContext.setCommercialRcvMsgNum(0);
                            break;
                        }
                        break;
                    case 20:
                    case 21:
                        consumeMessageContext.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        consumeMessageContext.setCommercialRcvTimes(1);
                        consumeMessageContext.setCommercialOwner(str);
                        consumeMessageContext.setRcvStat(BrokerStatsManager.StatsType.RCV_EPOLLS);
                        consumeMessageContext.setRcvMsgNum(0);
                        consumeMessageContext.setRcvMsgSize(0);
                        consumeMessageContext.setCommercialRcvMsgNum(0);
                        break;
                    default:
                        if (!$assertionsDisabled) {
                            throw new AssertionError();
                        }
                        break;
                }
                try {
                    executeConsumeMessageHookBefore(consumeMessageContext);
                } catch (AbortProcessException e2) {
                    createResponseCommand.setCode(e2.getResponseCode());
                    createResponseCommand.setRemark(e2.getErrorMessage());
                    return createResponseCommand;
                }
            }
            RemotingCommand rewriteResponseForStaticTopic = rewriteResponseForStaticTopic(decodeCommandCustomHeader, pullMessageResponseHeader, buildTopicQueueMappingContext, createResponseCommand.getCode());
            if (rewriteResponseForStaticTopic != null) {
                createResponseCommand = rewriteResponseForStaticTopic;
            }
            createResponseCommand = this.pullMessageResultHandler.handle(message, remotingCommand, decodeCommandCustomHeader, channel, build, findSubscriptionGroupConfig, z, expressionForRetryMessageFilter, createResponseCommand);
        } else {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("store getMessage return null");
        }
        if (z && hasCommitOffsetFlag) {
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getCommitOffset().longValue());
        }
        return createResponseCommand;
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    public void executeConsumeMessageHookBefore(ConsumeMessageContext consumeMessageContext) {
        if (hasConsumeMessageHook()) {
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageBefore(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }

    public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand remotingCommand) throws RemotingCommandException {
        this.brokerController.getPullMessageExecutor().submit((Runnable) new RequestTask(new Runnable() { // from class: org.apache.rocketmq.broker.processor.PullMessageProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    final RemotingCommand processRequest = PullMessageProcessor.this.processRequest(channel, remotingCommand, false);
                    if (processRequest != null) {
                        processRequest.setOpaque(remotingCommand.getOpaque());
                        processRequest.markResponseType();
                        try {
                            channel.writeAndFlush(processRequest).addListener(new ChannelFutureListener() { // from class: org.apache.rocketmq.broker.processor.PullMessageProcessor.1.1
                                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                    if (channelFuture.isSuccess()) {
                                        return;
                                    }
                                    PullMessageProcessor.LOGGER.error("processRequestWrapper response to {} failed", channelFuture.channel().remoteAddress(), channelFuture.cause());
                                    PullMessageProcessor.LOGGER.error(remotingCommand.toString());
                                    PullMessageProcessor.LOGGER.error(processRequest.toString());
                                }
                            });
                        } catch (Throwable th) {
                            PullMessageProcessor.LOGGER.error("processRequestWrapper process request over, but response failed", th);
                            PullMessageProcessor.LOGGER.error(remotingCommand.toString());
                            PullMessageProcessor.LOGGER.error(processRequest.toString());
                        }
                    }
                } catch (RemotingCommandException e) {
                    PullMessageProcessor.LOGGER.error("excuteRequestWhenWakeup run", e);
                }
            }
        }, channel, remotingCommand));
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    public void setPullMessageResultHandler(PullMessageResultHandler pullMessageResultHandler) {
        this.pullMessageResultHandler = pullMessageResultHandler;
    }

    static {
        $assertionsDisabled = !PullMessageProcessor.class.desiredAssertionStatus();
        LOGGER = InternalLoggerFactory.getLogger("RocketmqBroker");
    }
}
