package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import com.alibaba.rocketmq.common.TopicFilterType;
import com.alibaba.rocketmq.common.UtilAll;
import com.alibaba.rocketmq.common.constant.PermName;
import com.alibaba.rocketmq.common.help.FAQUrl;
import com.alibaba.rocketmq.common.message.MessageAccessor;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.store.MessageExtBrokerInner;
import com.alibaba.rocketmq.store.PutMessageResult;
import com.alibaba.rocketmq.store.PutMessageStatus;
import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:com/alibaba/rocketmq/broker/processor/SendMessageProcessor.class */
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private List<SendMessageHook> sendMessageHookList;
    private List<ConsumeMessageHook> consumeMessageHookList;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.rocketmq.broker.processor.SendMessageProcessor$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/rocketmq/broker/processor/SendMessageProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus = new int[PutMessageStatus.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.PUT_OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_DISK_TIMEOUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.FLUSH_SLAVE_TIMEOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.SLAVE_NOT_AVAILABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.CREATE_MAPEDFILE_FAILED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.MESSAGE_ILLEGAL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.SERVICE_NOT_AVAILABLE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[PutMessageStatus.UNKNOWN_ERROR.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    public SendMessageProcessor(BrokerController brokerController) {
        super(brokerController);
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        switch (remotingCommand.getCode()) {
            case 36:
                return consumerSendMsgBack(channelHandlerContext, remotingCommand);
            default:
                SendMessageRequestHeader parseRequestHeader = parseRequestHeader(remotingCommand);
                if (parseRequestHeader == null) {
                    return null;
                }
                SendMessageContext buildMsgContext = buildMsgContext(channelHandlerContext, parseRequestHeader);
                executeSendMessageHookBefore(channelHandlerContext, remotingCommand, buildMsgContext);
                RemotingCommand sendMessage = sendMessage(channelHandlerContext, remotingCommand, buildMsgContext, parseRequestHeader);
                executeSendMessageHookAfter(sendMessage, buildMsgContext);
                return sendMessage;
        }
    }

    private RemotingCommand consumerSendMsgBack(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        ConsumerSendMsgBackRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        if (hasConsumeMessageHook() && !UtilAll.isBlank(decodeCommandCustomHeader.getOriginMsgId())) {
            ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(decodeCommandCustomHeader.getGroup());
            consumeMessageContext.setTopic(decodeCommandCustomHeader.getOriginTopic());
            consumeMessageContext.setClientHost(RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
            consumeMessageContext.setSuccess(false);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER.toString());
            HashMap hashMap = new HashMap();
            hashMap.put(decodeCommandCustomHeader.getOriginMsgId(), decodeCommandCustomHeader.getOffset());
            consumeMessageContext.setMessageIds(hashMap);
            executeConsumeMessageHookAfter(consumeMessageContext);
        }
        SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(decodeCommandCustomHeader.getGroup());
        if (null == findSubscriptionGroupConfig) {
            createResponseCommand.setCode(26);
            createResponseCommand.setRemark("subscription group not exist, " + decodeCommandCustomHeader.getGroup() + " " + FAQUrl.suggestTodo("https://github.com/alibaba/RocketMQ/issues/42"));
            return createResponseCommand;
        }
        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return createResponseCommand;
        }
        if (findSubscriptionGroupConfig.getRetryQueueNums() <= 0) {
            createResponseCommand.setCode(0);
            createResponseCommand.setRemark((String) null);
            return createResponseCommand;
        }
        String retryTopic = MixAll.getRetryTopic(decodeCommandCustomHeader.getGroup());
        int abs = Math.abs(this.random.nextInt() % 99999999) % findSubscriptionGroupConfig.getRetryQueueNums();
        int i = 0;
        if (decodeCommandCustomHeader.isUnitMode()) {
            i = TopicSysFlag.buildSysFlag(false, true);
        }
        TopicConfig createTopicInSendMessageBackMethod = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(retryTopic, findSubscriptionGroupConfig.getRetryQueueNums(), 6, i);
        if (null == createTopicInSendMessageBackMethod) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("topic[" + retryTopic + "] not exist");
            return createResponseCommand;
        }
        if (!PermName.isWriteable(createTopicInSendMessageBackMethod.getPerm())) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark(String.format("the topic[%s] sending message is forbidden", retryTopic));
            return createResponseCommand;
        }
        MessageExt lookMessageByOffset = this.brokerController.getMessageStore().lookMessageByOffset(decodeCommandCustomHeader.getOffset().longValue());
        if (null == lookMessageByOffset) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("look message by offset failed, " + decodeCommandCustomHeader.getOffset());
            return createResponseCommand;
        }
        if (null == lookMessageByOffset.getProperty("RETRY_TOPIC")) {
            MessageAccessor.putProperty(lookMessageByOffset, "RETRY_TOPIC", lookMessageByOffset.getTopic());
        }
        lookMessageByOffset.setWaitStoreMsgOK(false);
        int intValue = decodeCommandCustomHeader.getDelayLevel().intValue();
        if (lookMessageByOffset.getReconsumeTimes() >= findSubscriptionGroupConfig.getRetryMaxTimes() || intValue < 0) {
            retryTopic = MixAll.getDLQTopic(decodeCommandCustomHeader.getGroup());
            abs = Math.abs(this.random.nextInt() % 99999999) % 1;
            if (null == this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(retryTopic, 1, 2, 0)) {
                createResponseCommand.setCode(1);
                createResponseCommand.setRemark("topic[" + retryTopic + "] not exist");
                return createResponseCommand;
            }
        } else {
            if (0 == intValue) {
                intValue = 3 + lookMessageByOffset.getReconsumeTimes();
            }
            lookMessageByOffset.setDelayTimeLevel(intValue);
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(retryTopic);
        messageExtBrokerInner.setBody(lookMessageByOffset.getBody());
        messageExtBrokerInner.setFlag(lookMessageByOffset.getFlag());
        MessageAccessor.setProperties(messageExtBrokerInner, lookMessageByOffset.getProperties());
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(lookMessageByOffset.getProperties()));
        messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode((TopicFilterType) null, lookMessageByOffset.getTags()));
        messageExtBrokerInner.setQueueId(abs);
        messageExtBrokerInner.setSysFlag(lookMessageByOffset.getSysFlag());
        messageExtBrokerInner.setBornTimestamp(lookMessageByOffset.getBornTimestamp());
        messageExtBrokerInner.setBornHost(lookMessageByOffset.getBornHost());
        messageExtBrokerInner.setStoreHost(getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(lookMessageByOffset.getReconsumeTimes() + 1);
        String originMessageId = MessageAccessor.getOriginMessageId(lookMessageByOffset);
        MessageAccessor.setOriginMessageId(messageExtBrokerInner, UtilAll.isBlank(originMessageId) ? lookMessageByOffset.getMsgId() : originMessageId);
        PutMessageResult putMessage = this.brokerController.getMessageStore().putMessage(messageExtBrokerInner);
        if (putMessage == null) {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("putMessageResult is null");
            return createResponseCommand;
        }
        switch (AnonymousClass1.$SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[putMessage.getPutMessageStatus().ordinal()]) {
            case 1:
                String topic = lookMessageByOffset.getTopic();
                String property = lookMessageByOffset.getProperty("RETRY_TOPIC");
                if (property != null) {
                    topic = property;
                }
                this.brokerController.getBrokerStatsManager().incSendBackNums(decodeCommandCustomHeader.getGroup(), topic);
                createResponseCommand.setCode(0);
                createResponseCommand.setRemark((String) null);
                return createResponseCommand;
            default:
                createResponseCommand.setCode(1);
                createResponseCommand.setRemark(putMessage.getPutMessageStatus().name());
                return createResponseCommand;
        }
    }

    private String diskUtil() {
        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", Double.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(this.brokerController.getMessageStoreConfig().getStorePathCommitLog())), Double.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()))), Double.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir()))));
    }

    private RemotingCommand sendMessage(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, SendMessageContext sendMessageContext, SendMessageRequestHeader sendMessageRequestHeader) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        createResponseCommand.setOpaque(remotingCommand.getOpaque());
        if (log.isDebugEnabled()) {
            log.debug("receive SendMessage request command, " + remotingCommand);
        }
        createResponseCommand.setCode(-1);
        super.msgCheck(channelHandlerContext, sendMessageRequestHeader, createResponseCommand);
        if (createResponseCommand.getCode() != -1) {
            return createResponseCommand;
        }
        byte[] body = remotingCommand.getBody();
        int intValue = sendMessageRequestHeader.getQueueId().intValue();
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(sendMessageRequestHeader.getTopic());
        if (intValue < 0) {
            intValue = Math.abs(this.random.nextInt() % 99999999) % selectTopicConfig.getWriteQueueNums();
        }
        int intValue2 = sendMessageRequestHeader.getSysFlag().intValue();
        if (TopicFilterType.MULTI_TAG == selectTopicConfig.getTopicFilterType()) {
            intValue2 |= 2;
        }
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(sendMessageRequestHeader.getTopic());
        messageExtBrokerInner.setBody(body);
        messageExtBrokerInner.setFlag(sendMessageRequestHeader.getFlag().intValue());
        MessageAccessor.setProperties(messageExtBrokerInner, MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
        messageExtBrokerInner.setPropertiesString(sendMessageRequestHeader.getProperties());
        messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(selectTopicConfig.getTopicFilterType(), messageExtBrokerInner.getTags()));
        messageExtBrokerInner.setQueueId(intValue);
        messageExtBrokerInner.setSysFlag(intValue2);
        messageExtBrokerInner.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp().longValue());
        messageExtBrokerInner.setBornHost(channelHandlerContext.channel().remoteAddress());
        messageExtBrokerInner.setStoreHost(getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes() == null ? 0 : sendMessageRequestHeader.getReconsumeTimes().intValue());
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage() && messageExtBrokerInner.getProperty("TRAN_MSG") != null) {
            createResponseCommand.setCode(16);
            createResponseCommand.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
            return createResponseCommand;
        }
        PutMessageResult putMessage = this.brokerController.getMessageStore().putMessage(messageExtBrokerInner);
        if (putMessage != null) {
            boolean z = false;
            switch (AnonymousClass1.$SwitchMap$com$alibaba$rocketmq$store$PutMessageStatus[putMessage.getPutMessageStatus().ordinal()]) {
                case 1:
                    z = true;
                    createResponseCommand.setCode(0);
                    break;
                case 2:
                    createResponseCommand.setCode(10);
                    z = true;
                    break;
                case 3:
                    createResponseCommand.setCode(12);
                    z = true;
                    break;
                case 4:
                    createResponseCommand.setCode(11);
                    z = true;
                    break;
                case 5:
                    createResponseCommand.setCode(1);
                    createResponseCommand.setRemark("create maped file failed, please make sure OS and JDK both 64bit.");
                    break;
                case 6:
                    createResponseCommand.setCode(13);
                    createResponseCommand.setRemark("the message is illegal, maybe length not matched.");
                    break;
                case 7:
                    createResponseCommand.setCode(14);
                    createResponseCommand.setRemark("service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
                    break;
                case 8:
                    createResponseCommand.setCode(1);
                    createResponseCommand.setRemark("UNKNOWN_ERROR");
                    break;
                default:
                    createResponseCommand.setCode(1);
                    createResponseCommand.setRemark("UNKNOWN_ERROR DEFAULT");
                    break;
            }
            if (z) {
                this.brokerController.getBrokerStatsManager().incTopicPutNums(messageExtBrokerInner.getTopic());
                this.brokerController.getBrokerStatsManager().incTopicPutSize(messageExtBrokerInner.getTopic(), putMessage.getAppendMessageResult().getWroteBytes());
                this.brokerController.getBrokerStatsManager().incBrokerPutNums();
                createResponseCommand.setRemark((String) null);
                readCustomHeader.setMsgId(putMessage.getAppendMessageResult().getMsgId());
                readCustomHeader.setQueueId(Integer.valueOf(intValue));
                readCustomHeader.setQueueOffset(Long.valueOf(putMessage.getAppendMessageResult().getLogicsOffset()));
                doResponse(channelHandlerContext, remotingCommand, createResponseCommand);
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    this.brokerController.getPullRequestHoldService().notifyMessageArriving(sendMessageRequestHeader.getTopic(), intValue, putMessage.getAppendMessageResult().getLogicsOffset() + 1);
                }
                if (!hasSendMessageHook()) {
                    return null;
                }
                sendMessageContext.setMsgId(readCustomHeader.getMsgId());
                sendMessageContext.setQueueId(readCustomHeader.getQueueId());
                sendMessageContext.setQueueOffset(readCustomHeader.getQueueOffset());
                return null;
            }
        } else {
            createResponseCommand.setCode(1);
            createResponseCommand.setRemark("store putMessage return null");
        }
        return createResponseCommand;
    }

    @Override // com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor
    public SocketAddress getStoreHost() {
        return this.storeHost;
    }

    @Override // com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor
    public boolean hasSendMessageHook() {
        return (this.sendMessageHookList == null || this.sendMessageHookList.isEmpty()) ? false : true;
    }

    @Override // com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor
    public void registerSendMessageHook(List<SendMessageHook> list) {
        this.sendMessageHookList = list;
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    public void executeConsumeMessageHookAfter(ConsumeMessageContext consumeMessageContext) {
        if (hasConsumeMessageHook()) {
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageAfter(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }
}
