package org.apache.pulsar.broker.service.persistent;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.MoreObjects;
import org.apache.pulsar.shade.com.google.common.base.Predicate;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.glassfish.hk2.utilities.BuilderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentSubscription.class */
public class PersistentSubscription extends AbstractSubscription implements Subscription {
    protected final PersistentTopic topic;
    protected final ManagedCursor cursor;
    protected volatile Dispatcher dispatcher;
    protected final String topicName;
    protected final String subName;
    protected final String fullName;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int isFenced;
    private PersistentMessageExpiryMonitor expiryMonitor;
    private long lastExpireTimestamp;
    private long lastConsumedFlowTimestamp;
    private long lastMarkDeleteAdvancedTimestamp;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
    private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
    private final PendingAckHandle pendingAckHandle;
    private volatile Map<String, String> subscriptionProperties;
    private volatile CompletableFuture<Void> fenceFuture;
    private final AsyncCallbacks.MarkDeleteCallback markDeleteCallback;
    private final AsyncCallbacks.DeleteCallback deleteCallback;
    private static final Logger log;
    private static final AtomicIntegerFieldUpdater<PersistentSubscription> IS_FENCED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentSubscription.class, "isFenced");
    private static final Map<String, Long> REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap();
    private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.broker.service.persistent.PersistentSubscription$9, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentSubscription$9.class */
    public static /* synthetic */ class AnonymousClass9 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$broker$service$plugin$EntryFilter$FilterResult = new int[EntryFilter.FilterResult.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$broker$service$plugin$EntryFilter$FilterResult[EntryFilter.FilterResult.REJECT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$broker$service$plugin$EntryFilter$FilterResult[EntryFilter.FilterResult.RESCHEDULE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$pulsar$common$api$proto$CommandSubscribe$SubType = new int[CommandSubscribe.SubType.values().length];
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandSubscribe$SubType[CommandSubscribe.SubType.Exclusive.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandSubscribe$SubType[CommandSubscribe.SubType.Failover.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandSubscribe$SubType[CommandSubscribe.SubType.Shared.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$CommandSubscribe$SubType[CommandSubscribe.SubType.Key_Shared.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<String, Long> getBaseCursorProperties(boolean z) {
        return z ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isCursorFromReplicatedSubscription(ManagedCursor managedCursor) {
        return managedCursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
    }

    public PersistentSubscription(PersistentTopic persistentTopic, String str, ManagedCursor managedCursor, boolean z) {
        this(persistentTopic, str, managedCursor, z, Collections.emptyMap());
    }

    public PersistentSubscription(PersistentTopic persistentTopic, String str, ManagedCursor managedCursor, boolean z, Map<String, String> map) {
        this.isFenced = 0;
        this.lastExpireTimestamp = 0L;
        this.lastConsumedFlowTimestamp = 0L;
        this.lastMarkDeleteAdvancedTimestamp = 0L;
        this.markDeleteCallback = new AsyncCallbacks.MarkDeleteCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.1
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback
            public void markDeleteComplete(Object obj) {
                PositionImpl positionImpl = (PositionImpl) obj;
                PositionImpl positionImpl2 = (PositionImpl) PersistentSubscription.this.cursor.getMarkDeletedPosition();
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Mark deleted messages to position {} from position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, positionImpl2, positionImpl});
                }
                PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded(positionImpl);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback
            public void markDeleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Failed to mark delete for position {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj, managedLedgerException});
                }
            }
        };
        this.deleteCallback = new AsyncCallbacks.DeleteCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.2
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback
            public void deleteComplete(Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Deleted message at {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj});
                }
                PersistentSubscription.this.notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) obj);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback
            public void deleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.log.warn("[{}][{}] Failed to delete message at {}: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, obj, managedLedgerException});
            }
        };
        this.topic = persistentTopic;
        this.cursor = managedCursor;
        this.topicName = persistentTopic.getName();
        this.subName = str;
        this.fullName = MoreObjects.toStringHelper(this).add("topic", this.topicName).add(BuilderHelper.NAME_KEY, this.subName).toString();
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, str, managedCursor, this);
        setReplicated(z);
        this.subscriptionProperties = MapUtils.isEmpty(map) ? Collections.emptyMap() : Collections.unmodifiableMap(map);
        if (!persistentTopic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() || SystemTopicNames.isEventSystemTopic(TopicName.get(this.topicName))) {
            this.pendingAckHandle = new PendingAckHandleDisabled();
        } else {
            this.pendingAckHandle = new PendingAckHandleImpl(this);
        }
        IS_FENCED_UPDATER.set(this, 0);
    }

    public void updateLastMarkDeleteAdvancedTimestamp() {
        this.lastMarkDeleteAdvancedTimestamp = Math.max(this.lastMarkDeleteAdvancedTimestamp, System.currentTimeMillis());
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public BrokerInterceptor interceptor() {
        return this.topic.getBrokerService().getInterceptor();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getName() {
        return this.subName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public Topic getTopic() {
        return this.topic;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean isReplicated() {
        return this.replicatedSubscriptionSnapshotCache != null;
    }

    public boolean setReplicated(boolean z) {
        ServiceConfiguration config = this.topic.getBrokerService().getPulsar().getConfig();
        if (!z || !config.isEnableReplicatedSubscriptions()) {
            this.replicatedSubscriptionSnapshotCache = null;
        } else if (this.replicatedSubscriptionSnapshotCache == null) {
            this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(this.subName, config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription());
        }
        if (this.cursor != null) {
            return z ? this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L) : this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
        }
        return false;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> addConsumer(Consumer consumer) {
        return this.pendingAckHandle.pendingAckHandleFuture().thenCompose(pendingAckHandle -> {
            synchronized (this) {
                this.cursor.updateLastActive();
                if (IS_FENCED_UPDATER.get(this) == 1) {
                    log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
                    return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionFencedException("Subscription is fenced"));
                }
                if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
                    Dispatcher dispatcher = null;
                    boolean isStreamingDispatch = this.topic.getBrokerService().getPulsar().getConfiguration().isStreamingDispatch();
                    switch (consumer.subType()) {
                        case Exclusive:
                            if (this.dispatcher == null || this.dispatcher.getType() != CommandSubscribe.SubType.Exclusive) {
                                dispatcher = this.dispatcher;
                                this.dispatcher = isStreamingDispatch ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Exclusive, 0, this.topic, this);
                                break;
                            }
                            break;
                        case Failover:
                            int partitionIndex = TopicName.getPartitionIndex(this.topicName);
                            if (partitionIndex < 0) {
                                partitionIndex = -1;
                            }
                            if (this.dispatcher == null || this.dispatcher.getType() != CommandSubscribe.SubType.Failover) {
                                dispatcher = this.dispatcher;
                                this.dispatcher = isStreamingDispatch ? new PersistentStreamingDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this) : new PersistentDispatcherSingleActiveConsumer(this.cursor, CommandSubscribe.SubType.Failover, partitionIndex, this.topic, this);
                                break;
                            }
                            break;
                        case Shared:
                            if (this.dispatcher == null || this.dispatcher.getType() != CommandSubscribe.SubType.Shared) {
                                dispatcher = this.dispatcher;
                                this.dispatcher = isStreamingDispatch ? new PersistentStreamingDispatcherMultipleConsumers(this.topic, this.cursor, this) : new PersistentDispatcherMultipleConsumers(this.topic, this.cursor, this);
                                break;
                            }
                            break;
                        case Key_Shared:
                            KeySharedMeta keySharedMeta = consumer.getKeySharedMeta();
                            if (this.dispatcher == null || this.dispatcher.getType() != CommandSubscribe.SubType.Key_Shared || !((PersistentStickyKeyDispatcherMultipleConsumers) this.dispatcher).hasSameKeySharedPolicy(keySharedMeta)) {
                                dispatcher = this.dispatcher;
                                this.dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(this.topic, this.cursor, this, this.topic.getBrokerService().getPulsar().getConfiguration(), keySharedMeta);
                                break;
                            }
                            break;
                        default:
                            return FutureUtil.failedFuture(new BrokerServiceException.ServerMetadataException("Unsupported subscription type"));
                    }
                    if (dispatcher != null) {
                        dispatcher.close().thenRun(() -> {
                            log.info("[{}][{}] Successfully closed previous dispatcher", this.topicName, this.subName);
                        }).exceptionally(th -> {
                            log.error("[{}][{}] Failed to close previous dispatcher", new Object[]{this.topicName, this.subName, th});
                            return null;
                        });
                    }
                } else if (consumer.subType() != this.dispatcher.getType()) {
                    return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException("Subscription is of different type"));
                }
                try {
                    this.dispatcher.addConsumer(consumer);
                    return CompletableFuture.completedFuture(null);
                } catch (BrokerServiceException e) {
                    return FutureUtil.failedFuture(e);
                }
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized void removeConsumer(Consumer consumer, boolean z) throws BrokerServiceException {
        this.cursor.updateLastActive();
        if (this.dispatcher != null) {
            this.dispatcher.removeConsumer(consumer);
        }
        ConsumerStatsImpl stats = consumer.getStats();
        this.bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
        this.msgOutFromRemovedConsumer.add(stats.msgOutCounter);
        if (this.dispatcher != null && this.dispatcher.getConsumers().isEmpty()) {
            deactivateCursor();
            this.topic.getManagedLedger().removeWaitingCursor(this.cursor);
            if (!this.cursor.isDurable()) {
                close().thenRun(() -> {
                    synchronized (this) {
                        if (this.dispatcher != null) {
                            this.dispatcher.close().thenRun(() -> {
                                log.info("[{}][{}] Successfully closed dispatcher for reader", this.topicName, this.subName);
                            }).exceptionally(th -> {
                                log.error("[{}][{}] Failed to close dispatcher for reader", new Object[]{this.topicName, this.subName, th});
                                return null;
                            });
                        }
                    }
                }).exceptionally(th -> {
                    log.error("[{}][{}] Failed to close subscription for reader", new Object[]{this.topicName, this.subName, th});
                    return null;
                });
                this.topic.getBrokerService().pulsar().getExecutor().execute(() -> {
                    this.topic.removeSubscription(this.subName);
                    if (z) {
                        return;
                    }
                    try {
                        this.topic.getManagedLedger().deleteCursor(this.cursor.getName());
                    } catch (InterruptedException | ManagedLedgerException e) {
                        log.warn("[{}] [{}] Failed to remove non durable cursor", new Object[]{this.topic.getName(), this.subName, e});
                    }
                });
            }
        }
        this.topic.decrementUsageCount();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Removed consumer -- count: {}", new Object[]{this.topic.getName(), this.subName, consumer.consumerName(), Long.valueOf(this.topic.currentUsageCount())});
        }
    }

    public void deactivateCursor() {
        this.cursor.setInactive();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void consumerFlow(Consumer consumer, int i) {
        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
        this.dispatcher.consumerFlow(consumer, i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void acknowledgeMessage(List<Position> list, CommandAck.AckType ackType, Map<String, Long> map) {
        ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition;
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (ackType != CommandAck.AckType.Cumulative) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Individual acks on {}", new Object[]{this.topicName, this.subName, list});
            }
            this.cursor.asyncDelete(list, this.deleteCallback, markDeletedPosition);
            if (this.topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
                list.forEach(position -> {
                    if (((ManagedCursorImpl) this.cursor).isMessageDeleted(position)) {
                        this.pendingAckHandle.clearIndividualPosition(position);
                    }
                });
            }
            if (this.dispatcher != null) {
                this.dispatcher.getRedeliveryTracker().removeBatch(list);
            }
        } else {
            if (list.size() != 1) {
                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids.", this.topicName, this.subName);
                return;
            }
            Position position2 = list.get(0);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Cumulative ack on {}", new Object[]{this.topicName, this.subName, position2});
            }
            this.cursor.asyncMarkDelete(position2, mergeCursorProperties(map), this.markDeleteCallback, markDeletedPosition);
        }
        if (!this.cursor.getMarkDeletedPosition().equals(markDeletedPosition)) {
            updateLastMarkDeleteAdvancedTimestamp();
            ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache = this.replicatedSubscriptionSnapshotCache;
            if (replicatedSubscriptionSnapshotCache != null && (advancedMarkDeletePosition = replicatedSubscriptionSnapshotCache.advancedMarkDeletePosition((PositionImpl) this.cursor.getMarkDeletedPosition())) != null) {
                this.topic.getReplicatedSubscriptionController().ifPresent(replicatedSubscriptionsController -> {
                    replicatedSubscriptionsController.localSubscriptionUpdated(this.subName, advancedMarkDeletePosition);
                });
            }
        }
        if (this.topic.getManagedLedger().isTerminated() && this.cursor.getNumberOfEntriesInBacklog(false) == 0 && this.dispatcher != null) {
            this.dispatcher.getConsumers().forEach((v0) -> {
                v0.reachedEndOfTopic();
            });
        }
    }

    public CompletableFuture<Void> transactionIndividualAcknowledge(TxnID txnID, List<MutablePair<PositionImpl, Integer>> list) {
        return this.pendingAckHandle.individualAcknowledgeMessage(txnID, list);
    }

    public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnID, List<PositionImpl> list) {
        return this.pendingAckHandle.cumulativeAcknowledgeMessage(txnID, list);
    }

    private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position position) {
        PositionImpl positionImpl = (PositionImpl) position;
        PositionImpl positionImpl2 = (PositionImpl) this.cursor.getMarkDeletedPosition();
        if (this.dispatcher == null || positionImpl2.compareTo(positionImpl) <= 0) {
            return;
        }
        this.dispatcher.markDeletePositionMoveForward();
    }

    public String toString() {
        return this.fullName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getTopicName() {
        return this.topicName;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CommandSubscribe.SubType getType() {
        if (this.dispatcher != null) {
            return this.dispatcher.getType();
        }
        return null;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public String getTypeString() {
        CommandSubscribe.SubType type = getType();
        if (type == null) {
            return "None";
        }
        switch (type) {
            case Exclusive:
                return "Exclusive";
            case Failover:
                return "Failover";
            case Shared:
                return "Shared";
            case Key_Shared:
                return "Key_Shared";
            default:
                return "Null";
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> optional) {
        final long currentTimeMillis = System.currentTimeMillis();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Starting to analyze backlog", this.topicName, this.subName);
        }
        final AtomicLong atomicLong = new AtomicLong();
        final AtomicLong atomicLong2 = new AtomicLong();
        final AtomicLong atomicLong3 = new AtomicLong();
        final AtomicLong atomicLong4 = new AtomicLong();
        final AtomicLong atomicLong5 = new AtomicLong();
        final AtomicLong atomicLong6 = new AtomicLong();
        final AtomicLong atomicLong7 = new AtomicLong();
        final AtomicLong atomicLong8 = new AtomicLong();
        Position markDeletedPosition = this.cursor.getMarkDeletedPosition();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] currentPosition {}", new Object[]{this.topicName, this.subName, markDeletedPosition});
        }
        final EntryFilterSupport entryFilterSupport = this.dispatcher != null ? (EntryFilterSupport) this.dispatcher : new EntryFilterSupport(this);
        ServiceConfiguration configuration = this.topic.getBrokerService().getPulsar().getConfiguration();
        long subscriptionBacklogScanMaxEntries = configuration.getSubscriptionBacklogScanMaxEntries();
        long subscriptionBacklogScanMaxTimeMs = configuration.getSubscriptionBacklogScanMaxTimeMs();
        int dispatcherMaxReadBatchSize = configuration.getDispatcherMaxReadBatchSize();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        return this.cursor.scan(optional, new Predicate<Entry>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.3
            @Override // org.apache.pulsar.shade.com.google.common.base.Predicate
            public boolean apply(Entry entry) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("found {}", entry);
                }
                Position position = entry.getPosition();
                atomicReference.compareAndSet(null, position);
                atomicReference2.set(position);
                MessageMetadata peekMessageMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(), "", -1L);
                int i = 1;
                if (peekMessageMetadata.hasNumMessagesInBatch()) {
                    i = peekMessageMetadata.getNumMessagesInBatch();
                }
                EntryFilter.FilterResult runFiltersForEntry = entryFilterSupport.runFiltersForEntry(entry, peekMessageMetadata, null);
                if (runFiltersForEntry == null) {
                    runFiltersForEntry = EntryFilter.FilterResult.ACCEPT;
                }
                switch (AnonymousClass9.$SwitchMap$org$apache$pulsar$broker$service$plugin$EntryFilter$FilterResult[runFiltersForEntry.ordinal()]) {
                    case 1:
                        atomicLong3.incrementAndGet();
                        atomicLong7.addAndGet(i);
                        break;
                    case 2:
                        atomicLong4.incrementAndGet();
                        atomicLong8.addAndGet(i);
                        break;
                    default:
                        atomicLong2.incrementAndGet();
                        atomicLong6.addAndGet(i);
                        break;
                }
                long incrementAndGet = atomicLong.incrementAndGet();
                atomicLong5.addAndGet(i);
                if (incrementAndGet % 1000 != 0) {
                    return true;
                }
                PersistentSubscription.log.info("[{}][{}] scan running since {} ms - scanned {} entries", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(incrementAndGet)});
                return true;
            }
        }, dispatcherMaxReadBatchSize, subscriptionBacklogScanMaxEntries, subscriptionBacklogScanMaxTimeMs).thenApply(scanOutcome -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            AnalyzeBacklogResult analyzeBacklogResult = new AnalyzeBacklogResult();
            analyzeBacklogResult.setFirstPosition((Position) atomicReference.get());
            analyzeBacklogResult.setLastPosition((Position) atomicReference2.get());
            analyzeBacklogResult.setEntries(atomicLong.get());
            analyzeBacklogResult.setMessages(atomicLong5.get());
            analyzeBacklogResult.setFilterAcceptedEntries(atomicLong2.get());
            analyzeBacklogResult.setFilterAcceptedMessages(atomicLong6.get());
            analyzeBacklogResult.setFilterRejectedEntries(atomicLong3.get());
            analyzeBacklogResult.setFilterRejectedMessages(atomicLong7.get());
            analyzeBacklogResult.setFilterRescheduledEntries(atomicLong4.get());
            analyzeBacklogResult.setFilterRescheduledMessages(atomicLong8.get());
            analyzeBacklogResult.setScanOutcome(scanOutcome);
            log.info("[{}][{}] scan took {} ms - {}", new Object[]{this.topicName, this.subName, Long.valueOf(currentTimeMillis2 - currentTimeMillis), analyzeBacklogResult});
            return analyzeBacklogResult;
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Backlog size before clearing: {}", new Object[]{this.topicName, this.subName, Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.4
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback
            public void clearBacklogComplete(Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Backlog size after clearing: {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                if (PersistentSubscription.this.dispatcher != null) {
                    PersistentSubscription.this.dispatcher.clearDelayedMessages();
                }
                completableFuture.complete(null);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback
            public void clearBacklogFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.log.error("[{}][{}] Failed to clear backlog", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> skipMessages(final int i) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.subName, Integer.valueOf(i), Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncSkipEntries(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.5
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback
            public void skipEntriesComplete(Object obj) {
                if (PersistentSubscription.log.isDebugEnabled()) {
                    PersistentSubscription.log.debug("[{}][{}] Skipped {} messages, new backlog {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Integer.valueOf(i), Long.valueOf(PersistentSubscription.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                completableFuture.complete(null);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback
            public void skipEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentSubscription.log.error("[{}][{}] Failed to skip {} messages", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Integer.valueOf(i), managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> resetCursor(final long j) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(this.topicName, this.cursor);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Resetting subscription to timestamp {}", new Object[]{this.topicName, this.subName, Long.valueOf(j)});
        }
        persistentMessageFinder.findMessages(j, new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.6
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback
            public void findEntryComplete(Position position, Object obj) {
                Position next;
                if (position == null) {
                    next = PersistentSubscription.this.cursor.getFirstPosition();
                    if (next == null) {
                        PersistentSubscription.log.warn("[{}][{}] Unable to find position for timestamp {}. Unable to reset cursor to first position", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(j)});
                        completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition("Unable to find position for specified timestamp"));
                        return;
                    }
                    PersistentSubscription.log.info("[{}][{}] Unable to find position for timestamp {}. Resetting cursor to first position {} in ledger", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, Long.valueOf(j), next});
                } else {
                    next = position.getNext();
                }
                PersistentSubscription.this.resetCursor(next, completableFuture);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback
            public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                if (managedLedgerException instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                    completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(managedLedgerException.getMessage()));
                } else {
                    completableFuture.completeExceptionally(new BrokerServiceException(managedLedgerException));
                }
            }
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> resetCursor(Position position) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        resetCursor(position, completableFuture);
        return completableFuture;
    }

    private void resetCursor(Position position, CompletableFuture<Void> completableFuture) {
        CompletableFuture<Void> completedFuture;
        if (!IS_FENCED_UPDATER.compareAndSet(this, 0, 1)) {
            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to fence subscription"));
            return;
        }
        synchronized (this) {
            completedFuture = (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) ? CompletableFuture.completedFuture(null) : this.dispatcher.disconnectActiveConsumers(true);
        }
        completedFuture.whenComplete((r12, th) -> {
            if (this.dispatcher != null) {
                this.dispatcher.resetCloseFuture();
            }
            if (th != null) {
                log.error("[{}][{}] Failed to disconnect consumer from subscription", new Object[]{this.topicName, this.subName, th});
                IS_FENCED_UPDATER.set(this, 0);
                completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                return;
            }
            log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", this.topicName, this.subName);
            try {
                boolean z = false;
                if (this.topic.getCompactedTopic() != null && this.topic.getCompactedTopic().getCompactionHorizon().isPresent() && ((PositionImpl) this.topic.getCompactedTopic().getCompactionHorizon().get()).compareTo((PositionImpl) position) >= 0) {
                    z = true;
                }
                this.cursor.asyncResetCursor(position, z, new AsyncCallbacks.ResetCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.7
                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback
                    public void resetComplete(Object obj) {
                        if (PersistentSubscription.log.isDebugEnabled()) {
                            PersistentSubscription.log.debug("[{}][{}] Successfully reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position});
                        }
                        if (PersistentSubscription.this.dispatcher != null) {
                            PersistentSubscription.this.dispatcher.cursorIsReset();
                        }
                        PersistentSubscription.IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        completableFuture.complete(null);
                    }

                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback
                    public void resetFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        PersistentSubscription.log.error("[{}][{}] Failed to reset subscription to position {}", new Object[]{PersistentSubscription.this.topicName, PersistentSubscription.this.subName, position, managedLedgerException});
                        PersistentSubscription.IS_FENCED_UPDATER.set(PersistentSubscription.this, 0);
                        if (managedLedgerException instanceof ManagedLedgerException.InvalidCursorPositionException) {
                            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionInvalidCursorPosition(managedLedgerException.getMessage()));
                        } else if (managedLedgerException instanceof ManagedLedgerException.ConcurrentFindCursorPositionException) {
                            completableFuture.completeExceptionally(new BrokerServiceException.SubscriptionBusyException(managedLedgerException.getMessage()));
                        } else {
                            completableFuture.completeExceptionally(new BrokerServiceException(managedLedgerException));
                        }
                    }
                });
            } catch (Exception e) {
                log.error("[{}][{}] Error while resetting cursor", new Object[]{this.topicName, this.subName, e});
                IS_FENCED_UPDATER.set(this, 0);
                completableFuture.completeExceptionally(new BrokerServiceException(e));
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Entry> peekNthMessage(int i) {
        final CompletableFuture<Entry> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Getting message at position {}", new Object[]{this.topicName, this.subName, Integer.valueOf(i)});
        }
        this.cursor.asyncGetNthEntry(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentSubscription.8
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback
            public void readEntryComplete(Entry entry, Object obj) {
                completableFuture.complete(entry);
            }
        }, null);
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public long getNumberOfEntriesInBacklog(boolean z) {
        return this.cursor.getNumberOfEntriesInBacklog(z);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
        return this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
    }

    public int getTotalNonContiguousDeletedMessagesRange() {
        return this.cursor.getTotalNonContiguousDeletedMessagesRange();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> close() {
        synchronized (this) {
            if (this.dispatcher == null || !this.dispatcher.isConsumerConnected()) {
                return this.pendingAckHandle.closeAsync().thenAccept(r8 -> {
                    IS_FENCED_UPDATER.set(this, 1);
                    log.info("[{}][{}] Successfully closed subscription [{}]", new Object[]{this.topicName, this.subName, this.cursor});
                });
            }
            return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionBusyException("Subscription has active consumers"));
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized CompletableFuture<Void> disconnect() {
        if (this.fenceFuture != null) {
            return this.fenceFuture;
        }
        this.fenceFuture = new CompletableFuture<>();
        IS_FENCED_UPDATER.set(this, 1);
        (this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenCompose(obj -> {
            return close();
        }).thenRun(() -> {
            log.info("[{}][{}] Successfully disconnected and closed subscription", this.topicName, this.subName);
            this.fenceFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}][{}] Error disconnecting consumers from subscription", new Object[]{this.topicName, this.subName, th});
            this.fenceFuture.completeExceptionally(th);
            resumeAfterFence();
            return null;
        });
        return this.fenceFuture;
    }

    public synchronized void resumeAfterFence() {
        if (this.fenceFuture != null) {
            this.fenceFuture.whenComplete((r8, th) -> {
                synchronized (this) {
                    try {
                        if (IS_FENCED_UPDATER.compareAndSet(this, 1, 0) && this.dispatcher != null) {
                            this.dispatcher.reset();
                        }
                        this.fenceFuture = null;
                    } catch (Exception e) {
                        log.error("[{}] Resume subscription [{}] failure", new Object[]{this.topicName, this.subName, e});
                    }
                }
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> delete() {
        return delete(false);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> deleteForcefully() {
        return delete(true);
    }

    private CompletableFuture<Void> delete(boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        log.info("[{}][{}] Unsubscribing", this.topicName, this.subName);
        CompletableFuture completableFuture2 = new CompletableFuture();
        if (z) {
            disconnect().thenRun(() -> {
                completableFuture2.complete(null);
            }).exceptionally(th -> {
                log.error("[{}][{}] Error disconnecting and closing subscription", new Object[]{this.topicName, this.subName, th});
                completableFuture2.completeExceptionally(th);
                return null;
            });
        } else {
            close().thenRun(() -> {
                completableFuture2.complete(null);
            }).exceptionally(th2 -> {
                log.error("[{}][{}] Error closing subscription", new Object[]{this.topicName, this.subName, th2});
                completableFuture2.completeExceptionally(th2);
                return null;
            });
        }
        completableFuture2.thenCompose(r4 -> {
            return this.topic.unsubscribe(this.subName);
        }).thenAccept(r6 -> {
            synchronized (this) {
                (this.dispatcher != null ? this.dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> {
                    log.info("[{}][{}] Successfully deleted subscription", this.topicName, this.subName);
                    completableFuture.complete(null);
                }).exceptionally(th3 -> {
                    IS_FENCED_UPDATER.set(this, 0);
                    if (this.dispatcher != null) {
                        this.dispatcher.reset();
                    }
                    log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, th3});
                    completableFuture.completeExceptionally(th3);
                    return null;
                });
            }
        }).exceptionally(th3 -> {
            IS_FENCED_UPDATER.set(this, 0);
            log.error("[{}][{}] Error deleting subscription", new Object[]{this.topicName, this.subName, th3});
            completableFuture.completeExceptionally(th3);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
        } catch (BrokerServiceException e) {
            log.warn("Error removing consumer {}", consumer);
            completableFuture.completeExceptionally(e);
        }
        if (this.dispatcher.canUnsubscribe(consumer)) {
            consumer.close();
            return delete();
        }
        completableFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public List<Consumer> getConsumers() {
        Dispatcher dispatcher = this.dispatcher;
        return dispatcher != null ? dispatcher.getConsumers() : Collections.emptyList();
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean expireMessages(int i) {
        long numberOfEntriesInBacklog = getNumberOfEntriesInBacklog(false);
        if (numberOfEntriesInBacklog == 0) {
            return false;
        }
        if (this.dispatcher != null && this.dispatcher.isConsumerConnected() && numberOfEntriesInBacklog < 1000 && !this.topic.isOldestMessageExpired(this.cursor, i)) {
            return false;
        }
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public boolean expireMessages(Position position) {
        this.lastExpireTimestamp = System.currentTimeMillis();
        return this.expiryMonitor.expireMessages(position);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public double getExpiredMessageRate() {
        return this.expiryMonitor.getMessageExpiryRate();
    }

    public PersistentMessageExpiryMonitor getExpiryMonitor() {
        return this.expiryMonitor;
    }

    public long estimateBacklogSize() {
        return this.cursor.getEstimatedSizeSinceMarkDeletePosition();
    }

    public SubscriptionStatsImpl getStats(Boolean bool, boolean z, boolean z2) {
        long j;
        Consumer activeConsumer;
        SubscriptionStatsImpl subscriptionStatsImpl = new SubscriptionStatsImpl();
        subscriptionStatsImpl.lastExpireTimestamp = this.lastExpireTimestamp;
        subscriptionStatsImpl.lastConsumedFlowTimestamp = this.lastConsumedFlowTimestamp;
        subscriptionStatsImpl.lastMarkDeleteAdvancedTimestamp = this.lastMarkDeleteAdvancedTimestamp;
        subscriptionStatsImpl.bytesOutCounter = this.bytesOutFromRemovedConsumers.longValue();
        subscriptionStatsImpl.msgOutCounter = this.msgOutFromRemovedConsumer.longValue();
        Dispatcher dispatcher = this.dispatcher;
        if (dispatcher != null) {
            Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == CommandSubscribe.SubType.Key_Shared ? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null;
            dispatcher.getConsumers().forEach(consumer -> {
                ConsumerStatsImpl stats = consumer.getStats();
                subscriptionStatsImpl.consumers.add(stats);
                subscriptionStatsImpl.msgRateOut += stats.msgRateOut;
                subscriptionStatsImpl.msgThroughputOut += stats.msgThroughputOut;
                subscriptionStatsImpl.bytesOutCounter += stats.bytesOutCounter;
                subscriptionStatsImpl.msgOutCounter += stats.msgOutCounter;
                subscriptionStatsImpl.msgRateRedeliver += stats.msgRateRedeliver;
                subscriptionStatsImpl.messageAckRate += stats.messageAckRate;
                subscriptionStatsImpl.chunkedMessageRate = (int) (subscriptionStatsImpl.chunkedMessageRate + stats.chunkedMessageRate);
                subscriptionStatsImpl.unackedMessages += stats.unackedMessages;
                subscriptionStatsImpl.lastConsumedTimestamp = Math.max(subscriptionStatsImpl.lastConsumedTimestamp, stats.lastConsumedTimestamp);
                subscriptionStatsImpl.lastAckedTimestamp = Math.max(subscriptionStatsImpl.lastAckedTimestamp, stats.lastAckedTimestamp);
                if (consumerKeyHashRanges == null || !consumerKeyHashRanges.containsKey(consumer)) {
                    return;
                }
                stats.keyHashRanges = (List) ((List) consumerKeyHashRanges.get(consumer)).stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList());
            });
            subscriptionStatsImpl.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
            subscriptionStatsImpl.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
            subscriptionStatsImpl.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
            subscriptionStatsImpl.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
        }
        CommandSubscribe.SubType type = getType();
        subscriptionStatsImpl.type = getTypeString();
        if ((dispatcher instanceof PersistentDispatcherSingleActiveConsumer) && (activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer()) != null) {
            subscriptionStatsImpl.activeConsumerName = activeConsumer.consumerName();
        }
        if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
            subscriptionStatsImpl.delayedTrackerMemoryUsage = ((PersistentDispatcherMultipleConsumers) dispatcher).getDelayedTrackerMemoryUsage();
        }
        if (Subscription.isIndividualAckMode(type) && (dispatcher instanceof PersistentDispatcherMultipleConsumers)) {
            PersistentDispatcherMultipleConsumers persistentDispatcherMultipleConsumers = (PersistentDispatcherMultipleConsumers) dispatcher;
            subscriptionStatsImpl.unackedMessages = persistentDispatcherMultipleConsumers.getTotalUnackedMessages();
            subscriptionStatsImpl.blockedSubscriptionOnUnackedMsgs = persistentDispatcherMultipleConsumers.isBlockedDispatcherOnUnackedMsgs();
            subscriptionStatsImpl.msgDelayed = persistentDispatcherMultipleConsumers.getNumberOfDelayedMessages();
        }
        subscriptionStatsImpl.msgBacklog = getNumberOfEntriesInBacklog(bool.booleanValue());
        if (z) {
            subscriptionStatsImpl.backlogSize = ((ManagedLedgerImpl) this.topic.getManagedLedger()).getEstimatedBacklogSize((PositionImpl) this.cursor.getMarkDeletedPosition());
        }
        if (z2 && subscriptionStatsImpl.msgBacklog > 0) {
            try {
                j = ((ManagedLedgerImpl) this.cursor.getManagedLedger()).getEarliestMessagePublishTimeOfPos((PositionImpl) this.cursor.getMarkDeletedPosition()).get().longValue();
            } catch (InterruptedException | ExecutionException e) {
                j = -1;
            }
            subscriptionStatsImpl.earliestMsgPublishTimeInBacklog = j;
        }
        subscriptionStatsImpl.msgBacklogNoDelayed = subscriptionStatsImpl.msgBacklog - subscriptionStatsImpl.msgDelayed;
        subscriptionStatsImpl.msgRateExpired = this.expiryMonitor.getMessageExpiryRate();
        subscriptionStatsImpl.totalMsgExpired = this.expiryMonitor.getTotalMessageExpired();
        subscriptionStatsImpl.isReplicated = isReplicated();
        subscriptionStatsImpl.subscriptionProperties = this.subscriptionProperties;
        subscriptionStatsImpl.isDurable = this.cursor.isDurable();
        if (getType() == CommandSubscribe.SubType.Key_Shared && (dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers)) {
            PersistentStickyKeyDispatcherMultipleConsumers persistentStickyKeyDispatcherMultipleConsumers = (PersistentStickyKeyDispatcherMultipleConsumers) dispatcher;
            subscriptionStatsImpl.allowOutOfOrderDelivery = persistentStickyKeyDispatcherMultipleConsumers.isAllowOutOfOrderDelivery();
            subscriptionStatsImpl.keySharedMode = persistentStickyKeyDispatcherMultipleConsumers.getKeySharedMode().toString();
            LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = persistentStickyKeyDispatcherMultipleConsumers.getRecentlyJoinedConsumers();
            if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
                recentlyJoinedConsumers.forEach((consumer2, positionImpl) -> {
                    subscriptionStatsImpl.consumersAfterMarkDeletePosition.put(consumer2.consumerName(), positionImpl.toString());
                });
            }
        }
        subscriptionStatsImpl.nonContiguousDeletedMessagesRanges = this.cursor.getTotalNonContiguousDeletedMessagesRange();
        subscriptionStatsImpl.nonContiguousDeletedMessagesRangesSerializedSize = this.cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
        return subscriptionStatsImpl;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void redeliverUnacknowledgedMessages(Consumer consumer, long j) {
        Dispatcher dispatcher = getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer, j);
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        Dispatcher dispatcher = getDispatcher();
        if (dispatcher != null) {
            dispatcher.redeliverUnacknowledgedMessages(consumer, list);
        }
    }

    private void trimByMarkDeletePosition(List<PositionImpl> list) {
        list.removeIf(positionImpl -> {
            return this.cursor.getMarkDeletedPosition() != null && positionImpl.compareTo((PositionImpl) this.cursor.getMarkDeletedPosition()) <= 0;
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void addUnAckedMessages(int i) {
        this.dispatcher.addUnAckedMessages(i);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public synchronized long getNumberOfEntriesDelayed() {
        if (this.dispatcher != null) {
            return this.dispatcher.getNumberOfDelayedMessages();
        }
        return 0L;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void markTopicWithBatchMessagePublished() {
        this.topic.markBatchMessagePublished();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void topicTerminated() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) != 0 || null == this.dispatcher) {
            return;
        }
        this.dispatcher.getConsumers().forEach((v0) -> {
            v0.reachedEndOfTopic();
        });
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public Map<String, String> getSubscriptionProperties() {
        return this.subscriptionProperties;
    }

    public PositionImpl getPositionInPendingAck(PositionImpl positionImpl) {
        return this.pendingAckHandle.getPositionInPendingAck(positionImpl);
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> map) {
        Map<String, String> emptyMap = (map == null || map.isEmpty()) ? Collections.emptyMap() : Collections.unmodifiableMap(map);
        Map<String, String> map2 = emptyMap;
        return this.cursor.setCursorProperties(emptyMap).thenRun(() -> {
            this.subscriptionProperties = map2;
        });
    }

    protected Map<String, Long> mergeCursorProperties(Map<String, Long> map) {
        Map<String, Long> baseCursorProperties = getBaseCursorProperties(isReplicated());
        if (map.isEmpty()) {
            return baseCursorProperties;
        }
        TreeMap treeMap = new TreeMap();
        treeMap.putAll(map);
        treeMap.putAll(baseCursorProperties);
        return treeMap;
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot replicatedSubscriptionsSnapshot) {
        ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache = this.replicatedSubscriptionSnapshotCache;
        if (replicatedSubscriptionSnapshotCache != null) {
            replicatedSubscriptionSnapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(replicatedSubscriptionsSnapshot));
        }
    }

    @Override // org.apache.pulsar.broker.service.Subscription
    public CompletableFuture<Void> endTxn(long j, long j2, int i, long j3) {
        TxnID txnID = new TxnID(j, j2);
        if (TxnAction.COMMIT.getValue() == i) {
            return this.pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), j3);
        }
        if (TxnAction.ABORT.getValue() != i) {
            return FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Unsupported txnAction " + i));
        }
        Consumer consumer = null;
        if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
            consumer = ((PersistentDispatcherSingleActiveConsumer) getDispatcher()).getActiveConsumer();
        }
        return this.pendingAckHandle.abortTxn(txnID, consumer, j3);
    }

    @VisibleForTesting
    public ManagedCursor getCursor() {
        return this.cursor;
    }

    @VisibleForTesting
    public PendingAckHandle getPendingAckHandle() {
        return this.pendingAckHandle;
    }

    public void syncBatchPositionBitSetForPendingAck(PositionImpl positionImpl) {
        this.pendingAckHandle.syncBatchPositionAckSetForTransaction(positionImpl);
    }

    public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl positionImpl) {
        return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(positionImpl);
    }

    public TransactionPendingAckStats getTransactionPendingAckStats(boolean z) {
        return this.pendingAckHandle.getStats(z);
    }

    public boolean checkAndUnblockIfStuck() {
        if (this.dispatcher != null) {
            return this.dispatcher.checkAndUnblockIfStuck();
        }
        return false;
    }

    public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
        return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
    }

    public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
        return this.pendingAckHandle instanceof PendingAckHandleImpl ? ((PendingAckHandleImpl) this.pendingAckHandle).getStoreManageLedger() : FutureUtil.failedFuture(new BrokerServiceException.NotAllowedException("Pending ack handle don't use managedLedger!"));
    }

    public boolean checkIfPendingAckStoreInit() {
        return this.pendingAckHandle.checkIfPendingAckStoreInit();
    }

    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl positionImpl, Integer num) {
        return this.pendingAckHandle.checkPositionInPendingAckState(positionImpl, num);
    }

    static {
        REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
        log = LoggerFactory.getLogger(PersistentSubscription.class);
    }
}
