package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.InvalidPartitionExceptionRetryPolicy;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneRetryPolicy;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/cosmos/implementation/query/ChangeFeedFetcher.class */
public class ChangeFeedFetcher<T> extends Fetcher<T> {
    private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor();
    private final ChangeFeedState changeFeedState;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final Supplier<DocumentClientRetryPolicy> feedRangeContinuationRetryPolicySupplier;
    private final boolean completeAfterAllCurrentChangesRetrieved;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/cosmos/implementation/query/ChangeFeedFetcher$FeedRangeContinuationFeedRangeGoneRetryPolicy.class */
    public static final class FeedRangeContinuationFeedRangeGoneRetryPolicy extends DocumentClientRetryPolicy {
        private static final Logger LOGGER = LoggerFactory.getLogger(FeedRangeContinuationFeedRangeGoneRetryPolicy.class);
        private final ChangeFeedState state;
        private final RxDocumentClientImpl client;
        private final DocumentClientRetryPolicy nextRetryPolicy;
        private final Map<String, Object> requestOptionProperties;
        private MetadataDiagnosticsContext diagnosticsContext;
        private final RetryContext retryContext;
        private final Supplier<String> operationContextTextProvider;

        public FeedRangeContinuationFeedRangeGoneRetryPolicy(RxDocumentClientImpl rxDocumentClientImpl, ChangeFeedState changeFeedState, DocumentClientRetryPolicy documentClientRetryPolicy, Map<String, Object> map, RetryContext retryContext, Supplier<String> supplier) {
            Preconditions.checkNotNull(supplier, "Argument 'operationContextTextProvider' must not be null.");
            this.client = rxDocumentClientImpl;
            this.state = changeFeedState;
            this.nextRetryPolicy = documentClientRetryPolicy;
            this.requestOptionProperties = map;
            this.diagnosticsContext = null;
            this.retryContext = retryContext;
            this.operationContextTextProvider = supplier;
        }

        @Override // com.azure.cosmos.implementation.DocumentClientRetryPolicy
        public void onBeforeSendRequest(RxDocumentServiceRequest rxDocumentServiceRequest) {
            this.diagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(rxDocumentServiceRequest.requestContext.cosmosDiagnostics);
            this.nextRetryPolicy.onBeforeSendRequest(rxDocumentServiceRequest);
        }

        @Override // com.azure.cosmos.implementation.IRetryPolicy
        public Mono<ShouldRetryResult> shouldRetry(Exception exc) {
            return this.nextRetryPolicy.shouldRetry(exc).flatMap(shouldRetryResult -> {
                if (shouldRetryResult.shouldRetry) {
                    LOGGER.trace("Retrying due to inner retry policy");
                    return Mono.just(shouldRetryResult);
                }
                if (exc instanceof GoneException) {
                    return this.state.getContinuation() == null ? this.state.getFeedRange().getNormalizedEffectiveRange(this.client.getPartitionKeyRangeCache(), this.diagnosticsContext, this.client.getCollectionCache().resolveByRidAsync(this.diagnosticsContext, this.state.getContainerRid(), this.requestOptionProperties)).map(range -> {
                        return this.state.setContinuation(FeedRangeContinuation.create(this.state.getContainerRid(), this.state.getFeedRange(), (Range<String>) range));
                    }).flatMap(changeFeedState -> {
                        return changeFeedState.getContinuation().handleFeedRangeGone(this.client, (GoneException) exc);
                    }) : this.state.getContinuation().handleFeedRangeGone(this.client, (GoneException) exc).flatMap(shouldRetryResult -> {
                        if (shouldRetryResult.shouldRetry) {
                            LOGGER.debug("HandleFeedRangeGone will retry. Context: {}", this.operationContextTextProvider.get(), exc);
                        } else {
                            LOGGER.warn("No partition split or merge error - will fail the request. Context: {}", this.operationContextTextProvider.get(), exc);
                        }
                        return Mono.just(shouldRetryResult);
                    });
                }
                LOGGER.warn("Exception not applicable - will fail the request. Context: {}", this.operationContextTextProvider.get(), exc);
                return Mono.just(ShouldRetryResult.noRetry());
            });
        }

        @Override // com.azure.cosmos.implementation.IRetryPolicy
        public RetryContext getRetryContext() {
            return this.retryContext;
        }
    }

    public ChangeFeedFetcher(RxDocumentClientImpl rxDocumentClientImpl, Supplier<RxDocumentServiceRequest> supplier, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> function, ChangeFeedState changeFeedState, Map<String, Object> map, int i, int i2, boolean z, boolean z2, OperationContextAndListenerTuple operationContextAndListenerTuple, GlobalEndpointManager globalEndpointManager, GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManagerForCircuitBreaker) {
        super(function, true, i, i2, operationContextAndListenerTuple, null, globalEndpointManager, globalPartitionEndpointManagerForCircuitBreaker);
        Preconditions.checkNotNull(rxDocumentClientImpl, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(supplier, "Argument 'createRequestFunc' must not be null.");
        Preconditions.checkNotNull(changeFeedState, "Argument 'changeFeedState' must not be null.");
        this.changeFeedState = changeFeedState;
        String generatePath = PathsHelper.generatePath(ResourceType.DocumentCollection, supplier.get(), false);
        this.feedRangeContinuationRetryPolicySupplier = () -> {
            return getFeedRangeContinuationRetryPolicy(rxDocumentClientImpl, map, generatePath, z);
        };
        this.createRequestFunc = supplier;
        this.completeAfterAllCurrentChangesRetrieved = z2;
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    public Mono<FeedResponse<T>> nextPage() {
        DocumentClientRetryPolicy documentClientRetryPolicy = this.feedRangeContinuationRetryPolicySupplier.get();
        return documentClientRetryPolicy == null ? nextPageInternal(null) : ObservableHelper.inlineIfPossible(() -> {
            return nextPageInternal(documentClientRetryPolicy);
        }, documentClientRetryPolicy);
    }

    private Mono<FeedResponse<T>> nextPageInternal(DocumentClientRetryPolicy documentClientRetryPolicy) {
        return Mono.fromSupplier(() -> {
            return nextPageCore(documentClientRetryPolicy);
        }).flatMap(Function.identity()).flatMap(feedResponse -> {
            FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
            if (this.completeAfterAllCurrentChangesRetrieved) {
                if (continuation != null) {
                    if (continuation.hasFetchedAllChangesAvailableNow(feedResponse)) {
                        disableShouldFetchMore();
                        return Mono.just(feedResponse);
                    }
                    if (ModelBridgeInternal.noChanges(feedResponse)) {
                        reEnableShouldFetchMoreForRetry();
                        return Mono.empty();
                    }
                }
            } else if (continuation != null && continuation.handleChangeFeedNotModified(feedResponse) == ShouldRetryResult.RETRY_NOW) {
                reEnableShouldFetchMoreForRetry();
                return Mono.empty();
            }
            return Mono.just(feedResponse);
        }).repeatWhenEmpty(flux -> {
            return flux;
        });
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected String applyServerResponseContinuation(String str, RxDocumentServiceRequest rxDocumentServiceRequest, FeedResponse<T> feedResponse) {
        return this.changeFeedState.applyServerResponseContinuation(str, rxDocumentServiceRequest, (feedResponseAccessor.getNoChanges(feedResponse) || this.completeAfterAllCurrentChangesRetrieved) ? false : true);
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected boolean isFullyDrained(boolean z, FeedResponse<T> feedResponse) {
        if (ModelBridgeInternal.noChanges(feedResponse)) {
            return true;
        }
        FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
        return continuation != null && continuation.isDone();
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected String getContinuationForLogging() {
        return this.changeFeedState.toJson();
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected RxDocumentServiceRequest createRequest(int i, DocumentClientRetryPolicy documentClientRetryPolicy) {
        RxDocumentServiceRequest rxDocumentServiceRequest = this.createRequestFunc.get();
        if (documentClientRetryPolicy != null) {
            rxDocumentServiceRequest.requestContext.setClientRetryPolicySupplier(() -> {
                return documentClientRetryPolicy;
            });
            documentClientRetryPolicy.onBeforeSendRequest(rxDocumentServiceRequest);
        }
        this.changeFeedState.populateRequest(rxDocumentServiceRequest, i);
        return rxDocumentServiceRequest;
    }

    private DocumentClientRetryPolicy getFeedRangeContinuationRetryPolicy(RxDocumentClientImpl rxDocumentClientImpl, Map<String, Object> map, String str, boolean z) {
        DocumentClientRetryPolicy feedRangeContinuationFeedRangeGoneRetryPolicy;
        DocumentClientRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy(rxDocumentClientImpl.getCollectionCache(), rxDocumentClientImpl.getResetSessionTokenRetryPolicy().getRequestPolicy(null), str, map);
        if (z) {
            feedRangeContinuationFeedRangeGoneRetryPolicy = invalidPartitionExceptionRetryPolicy;
        } else {
            PartitionKeyRangeGoneRetryPolicy partitionKeyRangeGoneRetryPolicy = new PartitionKeyRangeGoneRetryPolicy(rxDocumentClientImpl, rxDocumentClientImpl.getCollectionCache(), rxDocumentClientImpl.getPartitionKeyRangeCache(), str, invalidPartitionExceptionRetryPolicy, map);
            feedRangeContinuationFeedRangeGoneRetryPolicy = new FeedRangeContinuationFeedRangeGoneRetryPolicy(rxDocumentClientImpl, this.changeFeedState, partitionKeyRangeGoneRetryPolicy, map, partitionKeyRangeGoneRetryPolicy.getRetryContext(), this::getOperationContextText);
        }
        return feedRangeContinuationFeedRangeGoneRetryPolicy;
    }
}
