/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.MessageQueueLock;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceLitePullImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class DefaultLitePullConsumerImpl
implements MQConsumerInner {
    private final InternalLogger log = ClientLogger.getLog();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final RPCHook rpcHook;
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    protected MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private OffsetStore offsetStore;
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
    private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
    private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
    private SubscriptionType subscriptionType = SubscriptionType.NONE;
    private long pullTimeDelayMillsWhenException = 1000L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000L;
    private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3000L;
    private DefaultLitePullConsumer defaultLitePullConsumer;
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
    private final ScheduledExecutorService scheduledExecutorService;
    private Map<String, TopicMessageQueueChangeListener> topicMessageQueueChangeListenerMap = new HashMap<String, TopicMessageQueueChangeListener>();
    private Map<String, Set<MessageQueue>> messageQueuesForTopic = new HashMap<String, Set<MessageQueue>>();
    private long consumeRequestFlowControlTimes = 0L;
    private long queueFlowControlTimes = 0L;
    private long queueMaxSpanFlowControlTimes = 0L;
    private long nextAutoCommitDeadline = -1L;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList();
    private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;

    public DefaultLitePullConsumerImpl(DefaultLitePullConsumer defaultLitePullConsumer, RPCHook rpcHook) {
        this.defaultLitePullConsumer = defaultLitePullConsumer;
        this.rpcHook = rpcHook;
        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(this.defaultLitePullConsumer.getPullThreadNums(), (ThreadFactory)new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MonitorMessageQueueChangeThread");
            }
        });
        this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
    }

    public void registerConsumeMessageHook(ConsumeMessageHook hook) {
        this.consumeMessageHookList.add(hook);
        this.log.info("register consumeMessageHook Hook, {}", (Object)hook.hookName());
    }

    public void executeHookBefore(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageBefore(context);
                }
                catch (Throwable e) {
                    this.log.error("consumeMessageHook {} executeHookBefore exception", (Object)hook.hookName(), (Object)e);
                }
            }
        }
    }

    public void executeHookAfter(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageAfter(context);
                }
                catch (Throwable e) {
                    this.log.error("consumeMessageHook {} executeHookAfter exception", (Object)hook.hookName(), (Object)e);
                }
            }
        }
    }

    private void checkServiceState() {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
        }
    }

    public void updateNameServerAddr(String newAddresses) {
        this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
    }

    private synchronized void setSubscriptionType(SubscriptionType type) {
        if (this.subscriptionType == SubscriptionType.NONE) {
            this.subscriptionType = type;
        } else if (this.subscriptionType != type) {
            throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
        }
    }

    private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
        this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
    }

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        Iterator it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry next = it.next();
            if (!((MessageQueue)next.getKey()).getTopic().equals(topic) || mqNewSet.contains(next.getKey())) continue;
            ((PullTaskImpl)next.getValue()).setCancelled(true);
            it.remove();
        }
        this.startPullTask(mqNewSet);
    }

    public synchronized void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                break;
            }
            case RUNNING: {
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultLitePullConsumer.getConsumerGroup());
                this.scheduledThreadPoolExecutor.shutdown();
                this.scheduledExecutorService.shutdown();
                this.mQClientFactory.shutdown();
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                this.log.info("the consumer [{}] shutdown OK", (Object)this.defaultLitePullConsumer.getConsumerGroup());
                break;
            }
        }
    }

    public synchronized boolean isRunning() {
        return this.serviceState == ServiceState.RUNNING;
    }

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                this.initMQClientFactory();
                this.initRebalanceImpl();
                this.initPullAPIWrapper();
                this.initOffsetStore();
                this.mQClientFactory.start();
                this.startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                this.log.info("the consumer [{}] start OK", (Object)this.defaultLitePullConsumer.getConsumerGroup());
                this.operateAfterRunning();
                break;
            }
            case RUNNING: 
            case START_FAILED: 
            case SHUTDOWN_ALREADY: {
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
            }
        }
    }

    private void initMQClientFactory() throws MQClientException {
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
        boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
        if (!registerOK) {
            this.serviceState = ServiceState.CREATE_JUST;
            throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
        }
    }

    private void initRebalanceImpl() {
        this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
        this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    }

    private void initPullAPIWrapper() {
        this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup(), this.isUnitMode());
        this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
    }

    private void initOffsetStore() throws MQClientException {
        if (this.defaultLitePullConsumer.getOffsetStore() != null) {
            this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
        } else {
            switch (this.defaultLitePullConsumer.getMessageModel()) {
                case BROADCASTING: {
                    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                    break;
                }
                case CLUSTERING: {
                    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                    break;
                }
            }
            this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
        }
        this.offsetStore.load();
    }

    private void startScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    DefaultLitePullConsumerImpl.this.fetchTopicMessageQueuesAndCompare();
                }
                catch (Exception e) {
                    DefaultLitePullConsumerImpl.this.log.error("ScheduledTask fetchMessageQueuesAndCompare exception", (Throwable)e);
                }
            }
        }, 10000L, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
    }

    private void operateAfterRunning() throws MQClientException {
        if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        }
        if (this.subscriptionType == SubscriptionType.ASSIGN) {
            this.updateAssignPullTask(this.assignedMessageQueue.messageQueues());
        }
        for (String topic : this.topicMessageQueueChangeListenerMap.keySet()) {
            Set<MessageQueue> messageQueues = this.fetchMessageQueues(topic);
            this.messageQueuesForTopic.put(topic, messageQueues);
        }
        this.mQClientFactory.checkClientInBroker();
    }

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
        if (this.defaultLitePullConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultLitePullConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
            throw new MQClientException("Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"), null);
        }
    }

    public PullAPIWrapper getPullAPIWrapper() {
        return this.pullAPIWrapper;
    }

    private void startPullTask(Collection<MessageQueue> mqSet) {
        for (MessageQueue messageQueue : mqSet) {
            if (this.taskTable.containsKey(messageQueue)) continue;
            PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
            this.taskTable.put(messageQueue, pullTask);
            this.scheduledThreadPoolExecutor.schedule(pullTask, 0L, TimeUnit.MILLISECONDS);
        }
    }

    private void updateAssignPullTask(Collection<MessageQueue> mqNewSet) {
        Iterator it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry next = it.next();
            if (mqNewSet.contains(next.getKey())) continue;
            ((PullTaskImpl)next.getValue()).setCancelled(true);
            it.remove();
        }
        this.startPullTask(mqNewSet);
    }

    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
            return;
        }
        ConcurrentMap<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
        if (subTable != null) {
            for (Map.Entry entry : subTable.entrySet()) {
                String topic = (String)entry.getKey();
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }

    public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            if (topic == null || "".equals(topic)) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            this.setSubscriptionType(SubscriptionType.SUBSCRIBE);
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)topic, (String)subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            this.assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
            if (this.serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }

    public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
        try {
            if (topic == null || "".equals(topic)) {
                throw new IllegalArgumentException("Topic can not be null or empty.");
            }
            this.setSubscriptionType(SubscriptionType.SUBSCRIBE);
            if (messageSelector == null) {
                this.subscribe(topic, "*");
                return;
            }
            SubscriptionData subscriptionData = FilterAPI.build((String)topic, (String)messageSelector.getExpression(), (String)messageSelector.getExpressionType());
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
            this.assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
            if (this.serviceState == ServiceState.RUNNING) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscribe exception", e);
        }
    }

    public synchronized void unsubscribe(String topic) {
        this.rebalanceImpl.getSubscriptionInner().remove(topic);
        this.removePullTaskCallback(topic);
        this.assignedMessageQueue.removeAssignedMessageQueue(topic);
    }

    public synchronized void assign(Collection<MessageQueue> messageQueues) {
        if (messageQueues == null || messageQueues.isEmpty()) {
            throw new IllegalArgumentException("Message queues can not be null or empty.");
        }
        this.setSubscriptionType(SubscriptionType.ASSIGN);
        this.assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
        if (this.serviceState == ServiceState.RUNNING) {
            this.updateAssignPullTask(messageQueues);
        }
    }

    private void maybeAutoCommit() {
        long now = System.currentTimeMillis();
        if (now >= this.nextAutoCommitDeadline) {
            this.commitAll();
            this.nextAutoCommitDeadline = now + this.defaultLitePullConsumer.getAutoCommitIntervalMillis();
        }
    }

    public synchronized List<MessageExt> poll(long timeout) {
        try {
            this.checkServiceState();
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
            if (this.defaultLitePullConsumer.isAutoCommit()) {
                this.maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;
            ConsumeRequest consumeRequest = this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            if (endTime - System.currentTimeMillis() > 0L) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() > 0L) continue;
                }
            }
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                if (!this.consumeMessageHookList.isEmpty()) {
                    ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(this.groupName());
                    consumeMessageContext.setMq(consumeRequest.getMessageQueue());
                    consumeMessageContext.setMsgList(messages);
                    consumeMessageContext.setSuccess(false);
                    this.executeHookBefore(consumeMessageContext);
                    consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
                    consumeMessageContext.setSuccess(true);
                    this.executeHookAfter(consumeMessageContext);
                }
                return messages;
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return Collections.emptyList();
    }

    public void pause(Collection<MessageQueue> messageQueues) {
        this.assignedMessageQueue.pause(messageQueues);
    }

    public void resume(Collection<MessageQueue> messageQueues) {
        this.assignedMessageQueue.resume(messageQueues);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
        Object objLock;
        if (!this.assignedMessageQueue.messageQueues().contains(messageQueue)) {
            if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
                throw new MQClientException("The message queue is not in assigned list, may be rebalancing, message queue: " + messageQueue, null);
            }
            throw new MQClientException("The message queue is not in assigned list, message queue: " + messageQueue, null);
        }
        long minOffset = this.minOffset(messageQueue);
        long maxOffset = this.maxOffset(messageQueue);
        if (offset < minOffset || offset > maxOffset) {
            throw new MQClientException("Seek offset illegal, seek offset = " + offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
        }
        Object object = objLock = this.messageQueueLock.fetchLockObject(messageQueue);
        synchronized (object) {
            this.clearMessageQueueInCache(messageQueue);
            PullTaskImpl oldPullTaskImpl = (PullTaskImpl)this.taskTable.get(messageQueue);
            if (oldPullTaskImpl != null) {
                oldPullTaskImpl.tryInterrupt();
                this.taskTable.remove(messageQueue);
            }
            this.assignedMessageQueue.setSeekOffset(messageQueue, offset);
            if (!this.taskTable.containsKey(messageQueue)) {
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                this.taskTable.put(messageQueue, pullTask);
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0L, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
        long begin = this.minOffset(messageQueue);
        this.seek(messageQueue, begin);
    }

    public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
        long end = this.maxOffset(messageQueue);
        this.seek(messageQueue, end);
    }

    private long maxOffset(MessageQueue messageQueue) throws MQClientException {
        this.checkServiceState();
        return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
    }

    private long minOffset(MessageQueue messageQueue) throws MQClientException {
        this.checkServiceState();
        return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
    }

    private void removePullTaskCallback(String topic) {
        this.removePullTask(topic);
    }

    private void removePullTask(String topic) {
        Iterator it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry next = it.next();
            if (!((MessageQueue)next.getKey()).getTopic().equals(topic)) continue;
            ((PullTaskImpl)next.getValue()).setCancelled(true);
            it.remove();
        }
    }

    public synchronized void commitAll() {
        for (MessageQueue messageQueue : this.assignedMessageQueue.messageQueues()) {
            try {
                this.commit(messageQueue);
            }
            catch (Exception e) {
                this.log.error("An error occurred when update consume offset Automatically.");
            }
        }
    }

    public synchronized void commit(Set<MessageQueue> messageQueues, boolean persist) {
        if (messageQueues == null || messageQueues.size() == 0) {
            return;
        }
        for (MessageQueue messageQueue : messageQueues) {
            this.commit(messageQueue);
        }
        if (persist) {
            this.offsetStore.persistAll(messageQueues);
        }
    }

    private synchronized void commit(MessageQueue messageQueue) {
        long consumerOffset = this.assignedMessageQueue.getConsumerOffset(messageQueue);
        if (consumerOffset != -1L) {
            ProcessQueue processQueue = this.assignedMessageQueue.getProcessQueue(messageQueue);
            if (processQueue != null && !processQueue.isDropped()) {
                this.updateConsumeOffset(messageQueue, consumerOffset);
            }
        } else {
            this.log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "].");
        }
    }

    private void updatePullOffset(MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
        if (this.assignedMessageQueue.getSeekOffset(messageQueue) == -1L) {
            this.assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
        }
    }

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            this.consumeRequestCache.put(consumeRequest);
        }
        catch (InterruptedException e) {
            this.log.error("Submit consumeRequest error", (Throwable)e);
        }
    }

    private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
        this.checkServiceState();
        long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
        return offset;
    }

    public long committed(MessageQueue messageQueue) throws MQClientException {
        this.checkServiceState();
        long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
        if (offset == -2L) {
            throw new MQClientException("Fetch consume offset from broker exception", null);
        }
        return offset;
    }

    private void clearMessageQueueInCache(MessageQueue messageQueue) {
        ProcessQueue processQueue = this.assignedMessageQueue.getProcessQueue(messageQueue);
        if (processQueue != null) {
            processQueue.clear();
        }
        Iterator iter = this.consumeRequestCache.iterator();
        while (iter.hasNext()) {
            if (!((ConsumeRequest)iter.next()).getMessageQueue().equals((Object)messageQueue)) continue;
            iter.remove();
        }
    }

    private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
        long offset = -1L;
        long seekOffset = this.assignedMessageQueue.getSeekOffset(messageQueue);
        if (seekOffset != -1L) {
            offset = seekOffset;
            this.assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
            this.assignedMessageQueue.setSeekOffset(messageQueue, -1L);
        } else {
            offset = this.assignedMessageQueue.getPullOffset(messageQueue);
            if (offset == -1L) {
                offset = this.fetchConsumeOffset(messageQueue);
            }
        }
        return offset;
    }

    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
        this.checkServiceState();
        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
    }

    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
    }

    private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
    }

    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }
        if (offset < 0L) {
            throw new MQClientException("offset < 0", null);
        }
        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }
        int sysFlag = PullSysFlag.buildSysFlag((boolean)false, (boolean)block, (boolean)true, (boolean)false, (boolean)true);
        long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
        boolean isTagType = ExpressionType.isTagType((String)subscriptionData.getExpressionType());
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(mq, subscriptionData.getSubString(), subscriptionData.getExpressionType(), isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag, 0L, this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null);
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        return pullResult;
    }

    private void resetTopic(List<MessageExt> msgList) {
        if (null == msgList || msgList.size() == 0) {
            return;
        }
        for (MessageExt messageExt : msgList) {
            if (null == this.defaultLitePullConsumer.getNamespace()) continue;
            messageExt.setTopic(NamespaceUtil.withoutNamespace((String)messageExt.getTopic(), (String)this.defaultLitePullConsumer.getNamespace()));
        }
    }

    public void updateConsumeOffset(MessageQueue mq, long offset) {
        this.checkServiceState();
        this.offsetStore.updateOffset(mq, offset, false);
    }

    @Override
    public String groupName() {
        return this.defaultLitePullConsumer.getConsumerGroup();
    }

    @Override
    public MessageModel messageModel() {
        return this.defaultLitePullConsumer.getMessageModel();
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_ACTIVELY;
    }

    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    }

    @Override
    public Set<SubscriptionData> subscriptions() {
        HashSet<SubscriptionData> subSet = new HashSet<SubscriptionData>();
        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
        return subSet;
    }

    @Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance(false);
        }
    }

    @Override
    public void persistConsumerOffset() {
        try {
            this.checkServiceState();
            HashSet<MessageQueue> mqs = new HashSet<MessageQueue>();
            if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
                Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
                mqs.addAll(allocateMq);
            } else if (this.subscriptionType == SubscriptionType.ASSIGN) {
                Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();
                mqs.addAll(assignedMessageQueue);
            }
            this.offsetStore.persistAll(mqs);
        }
        catch (Exception e) {
            this.log.error("Persist consumer offset error for group: {} ", (Object)this.defaultLitePullConsumer.getConsumerGroup(), (Object)e);
        }
    }

    @Override
    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
        ConcurrentMap<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
        }
    }

    @Override
    public boolean isSubscribeTopicNeedUpdate(String topic) {
        ConcurrentMap<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
        return false;
    }

    @Override
    public boolean isUnitMode() {
        return this.defaultLitePullConsumer.isUnitMode();
    }

    @Override
    public ConsumerRunningInfo consumerRunningInfo() {
        ConsumerRunningInfo info = new ConsumerRunningInfo();
        Properties prop = MixAll.object2Properties((Object)this.defaultLitePullConsumer);
        prop.put("PROP_CONSUMER_START_TIMESTAMP", String.valueOf(this.consumerStartTimestamp));
        info.setProperties(prop);
        info.getSubscriptionSet().addAll(this.subscriptions());
        return info;
    }

    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public DefaultLitePullConsumer getDefaultLitePullConsumer() {
        return this.defaultLitePullConsumer;
    }

    public Set<MessageQueue> fetchMessageQueues(String topic) throws MQClientException {
        this.checkServiceState();
        Set<MessageQueue> result = this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
        return this.parseMessageQueues(result);
    }

    private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
        for (Map.Entry<String, TopicMessageQueueChangeListener> entry : this.topicMessageQueueChangeListenerMap.entrySet()) {
            String topic = entry.getKey();
            TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
            Set<MessageQueue> oldMessageQueues = this.messageQueuesForTopic.get(topic);
            Set<MessageQueue> newMessageQueues = this.fetchMessageQueues(topic);
            boolean isChanged = !this.isSetEqual(newMessageQueues, oldMessageQueues);
            if (!isChanged) continue;
            this.messageQueuesForTopic.put(topic, newMessageQueues);
            if (topicMessageQueueChangeListener == null) continue;
            topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
        }
    }

    private boolean isSetEqual(Set<MessageQueue> set1, Set<MessageQueue> set2) {
        if (set1 == null && set2 == null) {
            return true;
        }
        if (set1 == null || set2 == null || set1.size() != set2.size() || set1.size() == 0 || set2.size() == 0) {
            return false;
        }
        Iterator<MessageQueue> iter = set2.iterator();
        boolean isEqual = true;
        while (iter.hasNext()) {
            if (set1.contains(iter.next())) continue;
            isEqual = false;
        }
        return isEqual;
    }

    public synchronized void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener listener) throws MQClientException {
        if (topic == null || listener == null) {
            throw new MQClientException("Topic or listener is null", null);
        }
        if (this.topicMessageQueueChangeListenerMap.containsKey(topic)) {
            this.log.warn("Topic {} had been registered, new listener will overwrite the old one", (Object)topic);
        }
        this.topicMessageQueueChangeListenerMap.put(topic, listener);
        if (this.serviceState == ServiceState.RUNNING) {
            Set<MessageQueue> messageQueues = this.fetchMessageQueues(topic);
            this.messageQueuesForTopic.put(topic, messageQueues);
        }
    }

    private Set<MessageQueue> parseMessageQueues(Set<MessageQueue> queueSet) {
        HashSet<MessageQueue> resultQueues = new HashSet<MessageQueue>();
        for (MessageQueue messageQueue : queueSet) {
            String userTopic = NamespaceUtil.withoutNamespace((String)messageQueue.getTopic(), (String)this.defaultLitePullConsumer.getNamespace());
            resultQueues.add(new MessageQueue(userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
        }
        return resultQueues;
    }

    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
    }

    public class ConsumeRequest {
        private final List<MessageExt> messageExts;
        private final MessageQueue messageQueue;
        private final ProcessQueue processQueue;

        public ConsumeRequest(List<MessageExt> messageExts, MessageQueue messageQueue, ProcessQueue processQueue) {
            this.messageExts = messageExts;
            this.messageQueue = messageQueue;
            this.processQueue = processQueue;
        }

        public List<MessageExt> getMessageExts() {
            return this.messageExts;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return this.processQueue;
        }
    }

    public class PullTaskImpl
    implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;
        private Thread currentThread;

        public PullTaskImpl(MessageQueue messageQueue) {
            this.messageQueue = messageQueue;
        }

        public void tryInterrupt() {
            this.setCancelled(true);
            if (this.currentThread == null) {
                return;
            }
            if (!this.currentThread.isInterrupted()) {
                this.currentThread.interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!this.isCancelled()) {
                this.currentThread = Thread.currentThread();
                if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS);
                    DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", (Object)this.messageQueue);
                    return;
                }
                ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue);
                if (null == processQueue || processQueue.isDropped()) {
                    DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", (Object)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), (Object)this.messageQueue);
                    return;
                }
                if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
                    if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) {
                        DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", (Object)DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), (Object)DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes);
                    }
                    return;
                }
                long cachedMessageCount = processQueue.getMsgCount().get();
                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 0x100000L;
                if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
                    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
                        DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
                    }
                    return;
                }
                if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
                    if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) {
                        DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes});
                    }
                    return;
                }
                if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS);
                    if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
                        DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes});
                    }
                    return;
                }
                long offset = 0L;
                try {
                    offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue);
                }
                catch (Exception e) {
                    DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", (Throwable)e);
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS);
                    return;
                }
                if (this.isCancelled() || processQueue.isDropped()) {
                    return;
                }
                long pullDelayTimeMills = 0L;
                try {
                    String topic = this.messageQueue.getTopic();
                    SubscriptionData subscriptionData = DefaultLitePullConsumerImpl.this.subscriptionType == SubscriptionType.SUBSCRIBE ? (SubscriptionData)DefaultLitePullConsumerImpl.this.rebalanceImpl.getSubscriptionInner().get(topic) : FilterAPI.buildSubscriptionData((String)topic, (String)"*");
                    PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    switch (pullResult.getPullStatus()) {
                        case FOUND: {
                            Object objLock;
                            Object object = objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue);
                            synchronized (object) {
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) {
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    DefaultLitePullConsumerImpl.this.submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue));
                                }
                                break;
                            }
                        }
                        case OFFSET_ILLEGAL: {
                            DefaultLitePullConsumerImpl.this.log.warn("The pull request offset illegal, {}", (Object)pullResult.toString());
                            break;
                        }
                    }
                    DefaultLitePullConsumerImpl.this.updatePullOffset(this.messageQueue, pullResult.getNextBeginOffset(), processQueue);
                }
                catch (InterruptedException interruptedException) {
                    DefaultLitePullConsumerImpl.this.log.warn("Polling thread was interrupted.", (Throwable)interruptedException);
                }
                catch (Throwable e) {
                    pullDelayTimeMills = DefaultLitePullConsumerImpl.this.pullTimeDelayMillsWhenException;
                    DefaultLitePullConsumerImpl.this.log.error("An error occurred in pull message process.", e);
                }
                if (!this.isCancelled()) {
                    DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
                } else {
                    DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", (Object)this.messageQueue);
                }
            }
        }

        public boolean isCancelled() {
            return this.cancelled;
        }

        public void setCancelled(boolean cancelled) {
            this.cancelled = cancelled;
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }
    }

    class MessageQueueListenerImpl
    implements MessageQueueListener {
        MessageQueueListenerImpl() {
        }

        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING: {
                    DefaultLitePullConsumerImpl.this.updateAssignedMessageQueue(topic, mqAll);
                    DefaultLitePullConsumerImpl.this.updatePullTask(topic, mqAll);
                    break;
                }
                case CLUSTERING: {
                    DefaultLitePullConsumerImpl.this.updateAssignedMessageQueue(topic, mqDivided);
                    DefaultLitePullConsumerImpl.this.updatePullTask(topic, mqDivided);
                    break;
                }
            }
        }
    }

    private static enum SubscriptionType {
        NONE,
        SUBSCRIBE,
        ASSIGN;

    }
}

