package com.alibaba.rocketmq.broker.processor;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumerData;
import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
import com.alibaba.rocketmq.common.protocol.heartbeat.ProducerData;
import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/broker/processor/ClientManageProcessor.class */
public class ClientManageProcessor implements NettyRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger("RocketmqBroker");
    private final BrokerController brokerController;
    private List<ConsumeMessageHook> consumeMessageHookList;

    public ClientManageProcessor(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        switch (remotingCommand.getCode()) {
            case 14:
                return queryConsumerOffset(channelHandlerContext, remotingCommand);
            case 15:
                return updateConsumerOffset(channelHandlerContext, remotingCommand);
            case 34:
                return heartBeat(channelHandlerContext, remotingCommand);
            case 35:
                return unregisterClient(channelHandlerContext, remotingCommand);
            case 38:
                return getConsumerListByGroup(channelHandlerContext, remotingCommand);
            default:
                return null;
        }
    }

    public boolean hasConsumeMessageHook() {
        return (this.consumeMessageHookList == null || this.consumeMessageHookList.isEmpty()) ? false : true;
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> list) {
        this.consumeMessageHookList = list;
    }

    public void executeConsumeMessageHookAfter(ConsumeMessageContext consumeMessageContext) {
        if (hasConsumeMessageHook()) {
            Iterator<ConsumeMessageHook> it = this.consumeMessageHookList.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeMessageAfter(consumeMessageContext);
                } catch (Throwable th) {
                }
            }
        }
    }

    private RemotingCommand updateConsumerOffset(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
        UpdateConsumerOffsetRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
        if (hasConsumeMessageHook()) {
            ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setConsumerGroup(decodeCommandCustomHeader.getConsumerGroup());
            consumeMessageContext.setTopic(decodeCommandCustomHeader.getTopic());
            consumeMessageContext.setClientHost(RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
            consumeMessageContext.setSuccess(true);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.brokerController.getBrokerConfig().getBrokerIP1(), this.brokerController.getNettyServerConfig().getListenPort());
            consumeMessageContext.setMessageIds(this.brokerController.getMessageStore().getMessageIds(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), this.brokerController.getConsumerOffsetManager().queryOffset(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue()), decodeCommandCustomHeader.getCommitOffset().longValue(), inetSocketAddress));
            executeConsumeMessageHookAfter(consumeMessageContext);
        }
        this.brokerController.getConsumerOffsetManager().commitOffset(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), decodeCommandCustomHeader.getCommitOffset().longValue());
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark((String) null);
        return createResponseCommand;
    }

    private RemotingCommand queryConsumerOffset(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
        QueryConsumerOffsetResponseHeader readCustomHeader = createResponseCommand.readCustomHeader();
        QueryConsumerOffsetRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
        long queryOffset = this.brokerController.getConsumerOffsetManager().queryOffset(decodeCommandCustomHeader.getConsumerGroup(), decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue());
        if (queryOffset >= 0) {
            readCustomHeader.setOffset(Long.valueOf(queryOffset));
            createResponseCommand.setCode(0);
            createResponseCommand.setRemark((String) null);
        } else if (this.brokerController.getMessageStore().getMinOffsetInQuque(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue()) > 0 || this.brokerController.getMessageStore().checkInDiskByConsumeOffset(decodeCommandCustomHeader.getTopic(), decodeCommandCustomHeader.getQueueId().intValue(), 0L)) {
            createResponseCommand.setCode(22);
            createResponseCommand.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        } else {
            readCustomHeader.setOffset(0L);
            createResponseCommand.setCode(0);
            createResponseCommand.setRemark((String) null);
        }
        return createResponseCommand;
    }

    public RemotingCommand getConsumerListByGroup(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
        GetConsumerListByGroupRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(decodeCommandCustomHeader.getConsumerGroup());
        if (consumerGroupInfo != null) {
            List<String> allClientId = consumerGroupInfo.getAllClientId();
            if (!allClientId.isEmpty()) {
                GetConsumerListByGroupResponseBody getConsumerListByGroupResponseBody = new GetConsumerListByGroupResponseBody();
                getConsumerListByGroupResponseBody.setConsumerIdList(allClientId);
                createResponseCommand.setBody(getConsumerListByGroupResponseBody.encode());
                createResponseCommand.setCode(0);
                createResponseCommand.setRemark((String) null);
                return createResponseCommand;
            }
            log.warn("getAllClientId failed, {} {}", decodeCommandCustomHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
        } else {
            log.warn("getConsumerGroupInfo failed, {} {}", decodeCommandCustomHeader.getConsumerGroup(), RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
        }
        createResponseCommand.setCode(1);
        createResponseCommand.setRemark("no consumer for this group, " + decodeCommandCustomHeader.getConsumerGroup());
        return createResponseCommand;
    }

    public RemotingCommand unregisterClient(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
        UnregisterClientRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channelHandlerContext.channel(), decodeCommandCustomHeader.getClientID(), remotingCommand.getLanguage(), remotingCommand.getVersion());
        String producerGroup = decodeCommandCustomHeader.getProducerGroup();
        if (producerGroup != null) {
            this.brokerController.getProducerManager().unregisterProducer(producerGroup, clientChannelInfo);
        }
        String consumerGroup = decodeCommandCustomHeader.getConsumerGroup();
        if (consumerGroup != null) {
            this.brokerController.getConsumerManager().unregisterConsumer(consumerGroup, clientChannelInfo);
        }
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark((String) null);
        return createResponseCommand;
    }

    public RemotingCommand heartBeat(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        HeartbeatData heartbeatData = (HeartbeatData) HeartbeatData.decode(remotingCommand.getBody(), HeartbeatData.class);
        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channelHandlerContext.channel(), heartbeatData.getClientID(), remotingCommand.getLanguage(), remotingCommand.getVersion());
        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
            SubscriptionGroupConfig findSubscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerData.getGroupName());
            if (null != findSubscriptionGroupConfig) {
                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(MixAll.getRetryTopic(consumerData.getGroupName()), findSubscriptionGroupConfig.getRetryQueueNums(), 6, consumerData.isUnitMode() ? TopicSysFlag.buildSysFlag(false, true) : 0);
            }
            if (this.brokerController.getConsumerManager().registerConsumer(consumerData.getGroupName(), clientChannelInfo, consumerData.getConsumeType(), consumerData.getMessageModel(), consumerData.getConsumeFromWhere(), consumerData.getSubscriptionDataSet())) {
                log.info("registerConsumer info changed {} {}", consumerData.toString(), RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
            }
        }
        Iterator it = heartbeatData.getProducerDataSet().iterator();
        while (it.hasNext()) {
            this.brokerController.getProducerManager().registerProducer(((ProducerData) it.next()).getGroupName(), clientChannelInfo);
        }
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark((String) null);
        return createResponseCommand;
    }
}
