package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.implementation.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.HashMap;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/ChangeFeedQueryImpl.class */
class ChangeFeedQueryImpl<T extends Resource> {
    private static final int INITIAL_TOP_VALUE = -1;
    private final RxDocumentClientImpl client;
    private final DiagnosticsClientContext clientContext;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final String documentsLink;
    private final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc;
    private final Class<T> klass;
    private final CosmosChangeFeedRequestOptions options;
    private final ResourceType resourceType;
    private final ChangeFeedState changeFeedState;

    public ChangeFeedQueryImpl(RxDocumentClientImpl rxDocumentClientImpl, ResourceType resourceType, Class<T> cls, String str, String str2, CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions) {
        Preconditions.checkNotNull(rxDocumentClientImpl, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(resourceType, "Argument 'resourceType' must not be null.");
        Preconditions.checkNotNull(cls, "Argument 'klass' must not be null.");
        Preconditions.checkNotNull(cosmosChangeFeedRequestOptions, "Argument 'requestOptions' must not be null.");
        Preconditions.checkNotNull(str, "Argument 'collectionLink' must not be null.");
        Preconditions.checkNotNull(str2, "Argument 'collectionRid' must not be null.");
        if (Strings.isNullOrWhiteSpace(str)) {
            throw new IllegalArgumentException("Argument 'collectionLink' must not be empty");
        }
        if (Strings.isNullOrWhiteSpace(str2)) {
            throw new IllegalArgumentException("Argument 'collectionRid' must not be empty");
        }
        this.createRequestFunc = this::createDocumentServiceRequest;
        this.executeFunc = this::executeRequestAsync;
        this.clientContext = rxDocumentClientImpl;
        this.client = rxDocumentClientImpl;
        this.resourceType = resourceType;
        this.klass = cls;
        this.documentsLink = Utils.joinPath(str, "docs");
        this.options = cosmosChangeFeedRequestOptions;
        FeedRangeInternal feedRangeInternal = (FeedRangeInternal) this.options.getFeedRange();
        ChangeFeedState changeFeedContinuationState = ModelBridgeInternal.getChangeFeedContinuationState(cosmosChangeFeedRequestOptions);
        this.changeFeedState = changeFeedContinuationState == null ? new ChangeFeedStateV1(str2, feedRangeInternal, ModelBridgeInternal.getChangeFeedMode(cosmosChangeFeedRequestOptions), ModelBridgeInternal.getChangeFeedStartFromSettings(cosmosChangeFeedRequestOptions), null) : changeFeedContinuationState;
    }

    public Flux<FeedResponse<T>> executeAsync() {
        return Paginator.getChangeFeedQueryResultAsObservable(this.client, this.changeFeedState, this.createRequestFunc, this.executeFunc, this.klass, -1, this.options.getMaxItemCount(), this.options.getMaxPrefetchPageCount(), ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options));
    }

    private RxDocumentServiceRequest createDocumentServiceRequest() {
        return RxDocumentServiceRequest.create(this.clientContext, OperationType.ReadFeed, this.resourceType, this.documentsLink, new HashMap(), this.options);
    }

    private Mono<FeedResponse<T>> executeRequestAsync(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return this.client.readFeed(rxDocumentServiceRequest).map(rxDocumentServiceResponse -> {
            return BridgeInternal.toChangeFeedResponsePage(rxDocumentServiceResponse, this.klass);
        });
    }
}
