package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
import com.alibaba.rocketmq.broker.longpolling.PullRequest;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.TopicFilterType;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.filter.FilterAPI;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.GetMessageResult;
import com.alibaba.rocketmq.store.GetMessageStatus;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.config.BrokerRole;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/broker/processor/PullMessageProcessor.class */
public class PullMessageProcessor implements NettyRequestProcessor {
    private static final Logger log;
    private final BrokerController brokerController;
    private List<ConsumeMessageHook> consumeMessageHookList;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.rocketmq.broker.processor.PullMessageProcessor$3, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/rocketmq/broker/processor/PullMessageProcessor$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus = new int[GetMessageStatus.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.MESSAGE_WAS_REMOVING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_LOGIC_QUEUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MESSAGE_IN_QUEUE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.NO_MATCHED_MESSAGE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_FOUND_NULL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_BADLY.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_OVERFLOW_ONE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[GetMessageStatus.OFFSET_TOO_SMALL.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    public PullMessageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        return processRequest(channelHandlerContext.channel(), remotingCommand, true);
    }

    public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand remotingCommand) throws RemotingCommandException {
        this.brokerController.getPullMessageExecutor().submit(new Runnable() { // from class: com.alibaba.rocketmq.broker.processor.PullMessageProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    final RemotingCommand processRequest = PullMessageProcessor.this.processRequest(channel, remotingCommand, false);
                    if (processRequest != null) {
                        processRequest.setOpaque(remotingCommand.getOpaque());
                        processRequest.markResponseType();
                        try {
                            channel.writeAndFlush(processRequest).addListener(new ChannelFutureListener() { // from class: com.alibaba.rocketmq.broker.processor.PullMessageProcessor.1.1
                                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                    if (channelFuture.isSuccess()) {
                                        return;
                                    }
                                    PullMessageProcessor.log.error("processRequestWrapper response to " + channelFuture.channel().remoteAddress() + " failed", channelFuture.cause());
                                    PullMessageProcessor.log.error(remotingCommand.toString());
                                    PullMessageProcessor.log.error(processRequest.toString());
                                }
                            });
                        } catch (Throwable th) {
                            PullMessageProcessor.log.error("processRequestWrapper process request over, but response failed", th);
                            PullMessageProcessor.log.error(remotingCommand.toString());
                            PullMessageProcessor.log.error(processRequest.toString());
                        }
                    }
                } catch (RemotingCommandException e) {
                    PullMessageProcessor.log.error("excuteRequestWhenWakeup run", e);
                }
            }
        });
    }

    private void generateOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) {
        try {
            MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
            messageExtBrokerInner.setTopic("OFFSET_MOVED_EVENT");
            messageExtBrokerInner.setTags(offsetMovedEvent.getConsumerGroup());
            messageExtBrokerInner.setDelayTimeLevel(0);
            messageExtBrokerInner.setKeys(offsetMovedEvent.getConsumerGroup());
            messageExtBrokerInner.setBody(offsetMovedEvent.encode());
            messageExtBrokerInner.setFlag(0);
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
            messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, messageExtBrokerInner.getTags()));
            messageExtBrokerInner.setQueueId(0);
            messageExtBrokerInner.setSysFlag(0);
            messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
            messageExtBrokerInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
            messageExtBrokerInner.setStoreHost(messageExtBrokerInner.getBornHost());
            messageExtBrokerInner.setReconsumeTimes(0);
            this.brokerController.getMessageStore().putMessage(messageExtBrokerInner);
        } catch (Exception e) {
            log.warn(String.format("generateOffsetMovedEvent Exception, %s", offsetMovedEvent.toString()), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RemotingCommand processRequest(final Channel channel, RemotingCommand remotingCommand, boolean z) throws RemotingCommandException {
        SubscriptionData buildSubscriptionData;
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        PullMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        PullMessageRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(PullMessageRequestHeader.class);
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (log.isDebugEnabled()) {
            log.debug("receive PullMessage request command, " + remotingCommand);
        }
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
            return createResponseCommand;
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getConsumerGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark("subscription group not exist, " + decodeCommandCustomHeader.getConsumerGroup() + " " + FAQUrl.suggestTodo("https://github.com/alibaba/RocketMQ/issues/42"));
            return createResponseCommand;
        }
        if (!findSubscriptionGroupConfig.isConsumeEnable()) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("subscription group no permission, " + decodeCommandCustomHeader.getConsumerGroup());
            return createResponseCommand;
        }
        boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(decodeCommandCustomHeader.getSysFlag().intValue());
        long longValue = hasSuspendFlag ? decodeCommandCustomHeader.getSuspendTimeoutMillis().longValue() : 0L;
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(decodeCommandCustomHeader.getTopic());
        if (null == selectTopicConfig) {
            log.error("the topic " + decodeCommandCustomHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
            createResponseCommand.setCode(17);
            createResponseCommand.setRemark("topic[" + decodeCommandCustomHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo("https://github.com/alibaba/RocketMQ/issues/38"));
            return createResponseCommand;
        }
        if (!PermName.isReadable(selectTopicConfig.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the topic[" + decodeCommandCustomHeader.getTopic() + "] pulling message is forbidden");
            return createResponseCommand;
        }
        if (decodeCommandCustomHeader.getQueueId().intValue() < 0 || decodeCommandCustomHeader.getQueueId().intValue() >= selectTopicConfig.getReadQueueNums()) {
            String str = "queueId[" + decodeCommandCustomHeader.getQueueId() + "] is illagal,Topic :" + decodeCommandCustomHeader.getTopic() + " topicConfig.readQueueNums: " + selectTopicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
            log.warn(str);
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark(str);
            return createResponseCommand;
        }
        if (hasSubscriptionFlag) {
            try {
                buildSubscriptionData = FilterAPI.buildSubscriptionData(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getSubscription());
            } catch (Exception e) {
                log.warn("parse the consumer's subscription[{}] failed, group: {}", decodeCommandCustomHeader.getSubscription(), decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(23);
                createResponseCommand.setRemark("parse the consumer's subscription failed");
                return createResponseCommand;
            }
        } else {
            ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(decodeCommandCustomHeader.getConsumerGroup());
            if (null == consumerGroupInfo) {
                log.warn("the consumer's group info not exist, group: {}", decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo("https://github.com/alibaba/RocketMQ/issues/46"));
                return createResponseCommand;
            }
            if (!findSubscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                createResponseCommand.setCode(16);
                createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] can not consume by broadcast way");
                return createResponseCommand;
            }
            buildSubscriptionData = consumerGroupInfo.findSubscriptionData(decodeCommandCustomHeader.getTopic());
            if (null == buildSubscriptionData) {
                log.warn("the consumer's subscription not exist, group: {}", decodeCommandCustomHeader.getConsumerGroup());
                createResponseCommand.setCode(24);
                createResponseCommand.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo("https://github.com/alibaba/RocketMQ/issues/46"));
                return createResponseCommand;
            }
            if (buildSubscriptionData.getSubVersion() < decodeCommandCustomHeader.getSubVersion().longValue()) {
                log.warn("the broker's subscription is not latest, group: {} {}", decodeCommandCustomHeader.getConsumerGroup(), buildSubscriptionData.getSubString());
                createResponseCommand.setCode(25);
                createResponseCommand.setRemark("the consumer's subscription not latest");
                return createResponseCommand;
            }
        }
        final GetMessageResult message = this.brokerController.getMessageStore().getMessage(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getQueueOffset().longValue(), decodeCommandCustomHeader.getMaxMsgNums().intValue(), buildSubscriptionData);
        if (message != null) {
            createResponseCommand.setRemark(message.getStatus().name());
            readCustomHeader.setNextBeginOffset(Long.valueOf(message.getNextBeginOffset()));
            readCustomHeader.setMinOffset(Long.valueOf(message.getMinOffset()));
            readCustomHeader.setMaxOffset(Long.valueOf(message.getMaxOffset()));
            if (message.isSuggestPullingFromSlave()) {
                readCustomHeader.setSuggestWhichBrokerId(Long.valueOf(findSubscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()));
            } else {
                readCustomHeader.setSuggestWhichBrokerId(Long.valueOf(findSubscriptionGroupConfig.getBrokerId()));
            }
            switch (AnonymousClass3.$SwitchMap$com$alibaba$rocketmq$store$GetMessageStatus[message.getStatus().ordinal()]) {
                case 1:
                    createResponseCommand.setCode(0);
                    if (hasConsumeMessageHook()) {
                        ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
                        consumeMessageContext.setConsumerGroup(decodeCommandCustomHeader.getConsumerGroup());
                        consumeMessageContext.setTopic(decodeCommandCustomHeader.getTopic());
                        consumeMessageContext.setClientHost(RemotingHelper.parseChannelRemoteAddr(channel));
                        consumeMessageContext.setStoreHost(this.brokerController.getBrokerAddr());
                        consumeMessageContext.setQueueId(decodeCommandCustomHeader.getQueueId());
                        consumeMessageContext.setMessageIds(this.brokerController.getMessageStore().getMessageIds(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getQueueOffset().longValue(), decodeCommandCustomHeader.getQueueOffset().longValue() + message.getMessageCount(), new InetSocketAddress(this.brokerController.getBrokerConfig().getBrokerIP1(), this.brokerController.getNettyServerConfig().getListenPort())));
                        consumeMessageContext.setBodyLength(message.getBufferTotalSize() / message.getMessageCount());
                        executeConsumeMessageHookBefore(consumeMessageContext);
                        break;
                    }
                    break;
                case 2:
                    createResponseCommand.setCode(20);
                    break;
                case 3:
                case 4:
                    if (0 == decodeCommandCustomHeader.getQueueOffset().longValue()) {
                        createResponseCommand.setCode(19);
                        break;
                    } else {
                        createResponseCommand.setCode(21);
                        log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", new Object[]{decodeCommandCustomHeader.getQueueOffset(), Long.valueOf(message.getNextBeginOffset()), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId(), decodeCommandCustomHeader.getConsumerGroup()});
                        break;
                    }
                case 5:
                    createResponseCommand.setCode(20);
                    break;
                case 6:
                    createResponseCommand.setCode(19);
                    break;
                case 7:
                    createResponseCommand.setCode(21);
                    log.info("the request offset: " + decodeCommandCustomHeader.getQueueOffset() + " over flow badly, broker max offset: " + message.getMaxOffset() + ", consumer: " + channel.remoteAddress());
                    break;
                case 8:
                    createResponseCommand.setCode(19);
                    break;
                case 9:
                    createResponseCommand.setCode(21);
                    log.info("the request offset: " + decodeCommandCustomHeader.getQueueOffset() + " too small, broker min offset: " + message.getMinOffset() + ", consumer: " + channel.remoteAddress());
                    break;
                default:
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
            switch (createResponseCommand.getCode()) {
                case 0:
                    this.brokerController.getBrokerStatsManager().incGroupGetNums(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), message.getMessageCount());
                    this.brokerController.getBrokerStatsManager().incGroupGetSize(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), message.getBufferTotalSize());
                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(message.getMessageCount());
                    try {
                        channel.writeAndFlush(new ManyMessageTransfer(createResponseCommand.encodeHeader(message.getBufferTotalSize()), message)).addListener(new ChannelFutureListener() { // from class: com.alibaba.rocketmq.broker.processor.PullMessageProcessor.2
                            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                message.release();
                                if (channelFuture.isSuccess()) {
                                    return;
                                }
                                PullMessageProcessor.log.error("transfer many message by pagecache failed, " + channel.remoteAddress(), channelFuture.cause());
                            }
                        });
                    } catch (Throwable th) {
                        log.error("", th);
                        message.release();
                    }
                    createResponseCommand = null;
                    break;
                case 19:
                    if (z && hasSuspendFlag) {
                        long j = longValue;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            j = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), new PullRequest(remotingCommand, channel, j, this.brokerController.getMessageStore().now(), decodeCommandCustomHeader.getQueueOffset().longValue()));
                        createResponseCommand = null;
                        break;
                    }
                    break;
                case 20:
                    break;
                case 21:
                    if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.brokerController.getBrokerConfig().isOffsetCheckInSlave()) {
                        MessageQueue messageQueue = new MessageQueue();
                        messageQueue.setTopic(decodeCommandCustomHeader.getTopic());
                        messageQueue.setQueueId(decodeCommandCustomHeader.getQueueId().intValue());
                        messageQueue.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
                        OffsetMovedEvent offsetMovedEvent = new OffsetMovedEvent();
                        offsetMovedEvent.setConsumerGroup(decodeCommandCustomHeader.getConsumerGroup());
                        offsetMovedEvent.setMessageQueue(messageQueue);
                        offsetMovedEvent.setOffsetRequest(decodeCommandCustomHeader.getQueueOffset().longValue());
                        offsetMovedEvent.setOffsetNew(message.getNextBeginOffset());
                        generateOffsetMovedEvent(offsetMovedEvent);
                    } else {
                        readCustomHeader.setSuggestWhichBrokerId(Long.valueOf(findSubscriptionGroupConfig.getBrokerId()));
                        createResponseCommand.setCode(20);
                    }
                    log.warn("PULL_OFFSET_MOVED:topic={}, groupId={}, clientId={}, offset={}, suggestBrokerId={}", new Object[]{decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getQueueOffset(), readCustomHeader.getSuggestWhichBrokerId()});
                    break;
                default:
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
        } else {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("store getMessage return null");
        }
        if ((z && hasCommitOffsetFlag) && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
            this.brokerController.getConsumerOffsetManager().commitOffset(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getCommitOffset().longValue());
        }
        return createResponseCommand;
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    public void executeConsumeMessageHookBefore(ConsumeMessageContext consumeMessageContext) {
        if (hasConsumeMessageHook()) {
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageBefore(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }

    static {
        $assertionsDisabled = !PullMessageProcessor.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger("RocketmqBroker");
    }
}
