package com.azure.cosmos.implementation.query;

import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.RetryPolicyWithDiagnostics;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/cosmos/implementation/query/ChangeFeedFetcher.class */
public class ChangeFeedFetcher<T extends Resource> extends Fetcher<T> {
    private final ChangeFeedState changeFeedState;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final IRetryPolicy feedRangeContinuationSplitRetryPolicy;

    /* loaded from: input_file:com/azure/cosmos/implementation/query/ChangeFeedFetcher$FeedRangeContinuationSplitRetryPolicy.class */
    private static final class FeedRangeContinuationSplitRetryPolicy extends RetryPolicyWithDiagnostics {
        private final ChangeFeedState state;
        private final RxDocumentClientImpl client;

        public FeedRangeContinuationSplitRetryPolicy(RxDocumentClientImpl rxDocumentClientImpl, ChangeFeedState changeFeedState) {
            this.client = rxDocumentClientImpl;
            this.state = changeFeedState;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.azure.cosmos.implementation.IRetryPolicy
        public Mono<ShouldRetryResult> shouldRetry(Exception exc) {
            return !(exc instanceof GoneException) ? Mono.just(ShouldRetryResult.noRetry()) : this.state.getContinuation() == null ? this.state.getFeedRange().getNormalizedEffectiveRange(this.client.getPartitionKeyRangeCache(), null, this.client.getCollectionCache().resolveByRidAsync(null, this.state.getContainerRid(), null)).map(range -> {
                return this.state.setContinuation(FeedRangeContinuation.create(this.state.getContainerRid(), this.state.getFeedRange(), range));
            }).flatMap(changeFeedState -> {
                return changeFeedState.getContinuation().handleSplit(this.client, (GoneException) exc);
            }) : this.state.getContinuation().handleSplit(this.client, (GoneException) exc);
        }
    }

    public ChangeFeedFetcher(RxDocumentClientImpl rxDocumentClientImpl, Supplier<RxDocumentServiceRequest> supplier, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> function, ChangeFeedState changeFeedState, int i, int i2, boolean z) {
        super(function, true, i, i2);
        Preconditions.checkNotNull(rxDocumentClientImpl, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(supplier, "Argument 'createRequestFunc' must not be null.");
        Preconditions.checkNotNull(changeFeedState, "Argument 'changeFeedState' must not be null.");
        this.createRequestFunc = supplier;
        this.changeFeedState = changeFeedState;
        this.feedRangeContinuationSplitRetryPolicy = z ? null : new FeedRangeContinuationSplitRetryPolicy(rxDocumentClientImpl, this.changeFeedState);
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    public Mono<FeedResponse<T>> nextPage() {
        return this.feedRangeContinuationSplitRetryPolicy == null ? nextPageInternal() : ObservableHelper.inlineIfPossible(this::nextPageInternal, this.feedRangeContinuationSplitRetryPolicy);
    }

    private Mono<FeedResponse<T>> nextPageInternal() {
        return Mono.fromSupplier(this::nextPageCore).flatMap(Function.identity()).flatMap(feedResponse -> {
            FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
            if (continuation == null || continuation.handleChangeFeedNotModified(feedResponse) != ShouldRetryResult.RETRY_NOW) {
                return Mono.just(feedResponse);
            }
            reenableShouldFetchMoreForRetry();
            return Mono.empty();
        }).repeatWhenEmpty(flux -> {
            return flux;
        });
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected String applyServerResponseContinuation(String str, RxDocumentServiceRequest rxDocumentServiceRequest) {
        return this.changeFeedState.applyServerResponseContinuation(str, rxDocumentServiceRequest);
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected boolean isFullyDrained(boolean z, FeedResponse<T> feedResponse) {
        if (ModelBridgeInternal.noChanges(feedResponse)) {
            return true;
        }
        FeedRangeContinuation continuation = this.changeFeedState.getContinuation();
        return continuation != null && continuation.isDone();
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected String getContinuationForLogging() {
        return this.changeFeedState.toJson();
    }

    @Override // com.azure.cosmos.implementation.query.Fetcher
    protected RxDocumentServiceRequest createRequest(int i) {
        RxDocumentServiceRequest rxDocumentServiceRequest = this.createRequestFunc.get();
        this.changeFeedState.populateRequest(rxDocumentServiceRequest, i);
        return rxDocumentServiceRequest;
    }
}
