package com.alibaba.rocketmq.broker.longpolling;

import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.class */
public class PullRequestHoldService extends ServiceThread {
    private static final Logger log = LoggerFactory.getLogger("RocketmqBroker");
    private static final String TOPIC_QUEUEID_SEPARATOR = "@";
    private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<>(1024);
    private final BrokerController brokerController;

    public PullRequestHoldService(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    private String buildKey(String str, int i) {
        return str + TOPIC_QUEUEID_SEPARATOR + i;
    }

    public void suspendPullRequest(String str, int i, PullRequest pullRequest) {
        String buildKey = buildKey(str, i);
        ManyPullRequest manyPullRequest = this.pullRequestTable.get(buildKey);
        if (null == manyPullRequest) {
            manyPullRequest = new ManyPullRequest();
            ManyPullRequest putIfAbsent = this.pullRequestTable.putIfAbsent(buildKey, manyPullRequest);
            if (putIfAbsent != null) {
                manyPullRequest = putIfAbsent;
            }
        }
        manyPullRequest.addPullRequest(pullRequest);
    }

    private void checkHoldRequest() {
        Iterator<String> it = this.pullRequestTable.keySet().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split(TOPIC_QUEUEID_SEPARATOR);
            if (split != null && 2 == split.length) {
                String str = split[0];
                int parseInt = Integer.parseInt(split[1]);
                notifyMessageArriving(str, parseInt, this.brokerController.getMessageStore().getMaxOffsetInQuque(str, parseInt));
            }
        }
    }

    public void notifyMessageArriving(String str, int i, long j) {
        List<PullRequest> cloneListAndClear;
        ManyPullRequest manyPullRequest = this.pullRequestTable.get(buildKey(str, i));
        if (manyPullRequest == null || (cloneListAndClear = manyPullRequest.cloneListAndClear()) == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (PullRequest pullRequest : cloneListAndClear) {
            if (j > pullRequest.getPullFromThisOffset()) {
                try {
                    this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(pullRequest.getClientChannel(), pullRequest.getRequestCommand());
                } catch (RemotingCommandException e) {
                    log.error("", e);
                }
            } else if (this.brokerController.getMessageStore().getMaxOffsetInQuque(str, i) > pullRequest.getPullFromThisOffset()) {
                try {
                    this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(pullRequest.getClientChannel(), pullRequest.getRequestCommand());
                } catch (RemotingCommandException e2) {
                    log.error("", e2);
                }
            } else if (System.currentTimeMillis() >= pullRequest.getSuspendTimestamp() + pullRequest.getTimeoutMillis()) {
                try {
                    this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(pullRequest.getClientChannel(), pullRequest.getRequestCommand());
                } catch (RemotingCommandException e3) {
                    log.error("", e3);
                }
            } else {
                arrayList.add(pullRequest);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        manyPullRequest.addPullRequest(arrayList);
    }

    public void run() {
        log.info(getServiceName() + " service started");
        while (!isStoped()) {
            try {
                waitForRunning(1000L);
                checkHoldRequest();
            } catch (Exception e) {
                log.warn(getServiceName() + " service has exception. ", e);
            }
        }
        log.info(getServiceName() + " service end");
    }

    public String getServiceName() {
        return PullRequestHoldService.class.getSimpleName();
    }
}
