package com.azure.cosmos.implementation;

import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Paginator;
import com.azure.cosmos.implementation.spark.OperationContext;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
import com.azure.cosmos.implementation.spark.OperationListener;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/cosmos/implementation/ChangeFeedQueryImpl.class */
public class ChangeFeedQueryImpl<T> {
    private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor();
    private static final int INITIAL_TOP_VALUE = -1;
    private final RxDocumentClientImpl client;
    private final DiagnosticsClientContext clientContext;
    private final Supplier<RxDocumentServiceRequest> createRequestFunc;
    private final String documentsLink;
    private final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc;
    private final Class<T> klass;
    private final CosmosChangeFeedRequestOptions options;
    private final ResourceType resourceType;
    private final ChangeFeedState changeFeedState;
    private final OperationContextAndListenerTuple operationContextAndListener;
    private final CosmosItemSerializer itemSerializer;

    public ChangeFeedQueryImpl(RxDocumentClientImpl rxDocumentClientImpl, ResourceType resourceType, Class<T> cls, String str, String str2, CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions) {
        Preconditions.checkNotNull(rxDocumentClientImpl, "Argument 'client' must not be null.");
        Preconditions.checkNotNull(resourceType, "Argument 'resourceType' must not be null.");
        Preconditions.checkNotNull(cls, "Argument 'klass' must not be null.");
        Preconditions.checkNotNull(cosmosChangeFeedRequestOptions, "Argument 'requestOptions' must not be null.");
        Preconditions.checkNotNull(str, "Argument 'collectionLink' must not be null.");
        Preconditions.checkNotNull(str2, "Argument 'collectionRid' must not be null.");
        if (Strings.isNullOrWhiteSpace(str)) {
            throw new IllegalArgumentException("Argument 'collectionLink' must not be empty");
        }
        if (Strings.isNullOrWhiteSpace(str2)) {
            throw new IllegalArgumentException("Argument 'collectionRid' must not be empty");
        }
        this.createRequestFunc = this::createDocumentServiceRequest;
        this.executeFunc = this::executeRequestAsync;
        this.clientContext = rxDocumentClientImpl;
        this.client = rxDocumentClientImpl;
        this.resourceType = resourceType;
        this.klass = cls;
        this.documentsLink = Utils.joinPath(str, "docs");
        this.options = cosmosChangeFeedRequestOptions;
        this.itemSerializer = rxDocumentClientImpl.getEffectiveItemSerializer(cosmosChangeFeedRequestOptions.getCustomItemSerializer());
        this.operationContextAndListener = ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor().getOperationContext(this.options);
        FeedRangeInternal feedRangeInternal = (FeedRangeInternal) this.options.getFeedRange();
        ChangeFeedState changeFeedContinuationState = ModelBridgeInternal.getChangeFeedContinuationState(cosmosChangeFeedRequestOptions);
        this.changeFeedState = changeFeedContinuationState == null ? new ChangeFeedStateV1(str2, feedRangeInternal, ModelBridgeInternal.getChangeFeedMode(cosmosChangeFeedRequestOptions), ModelBridgeInternal.getChangeFeedStartFromSettings(cosmosChangeFeedRequestOptions), null) : changeFeedContinuationState;
    }

    public Flux<FeedResponse<T>> executeAsync() {
        return Paginator.getChangeFeedQueryResultAsObservable(this.client, this.changeFeedState, ModelBridgeInternal.getPropertiesFromChangeFeedRequestOptions(this.options), this.createRequestFunc, this.executeFunc, -1, this.options.getMaxItemCount(), this.options.getMaxPrefetchPageCount(), ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options), ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor().getOperationContext(this.options));
    }

    private RxDocumentServiceRequest createDocumentServiceRequest() {
        HashMap hashMap = new HashMap();
        Map<String, String> header = ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor().getHeader(this.options);
        if (header != null) {
            hashMap.putAll(header);
        }
        if (this.options.isQuotaInfoEnabled()) {
            hashMap.put(HttpConstants.HttpHeaders.POPULATE_QUOTA_INFO, String.valueOf(true));
        }
        if (this.client.getConsistencyLevel() != null) {
            hashMap.put(HttpConstants.HttpHeaders.CONSISTENCY_LEVEL, this.client.getConsistencyLevel().toString());
        }
        RxDocumentServiceRequest create = RxDocumentServiceRequest.create(this.clientContext, OperationType.ReadFeed, this.resourceType, this.documentsLink, hashMap, this.options);
        if (create.requestContext != null) {
            create.requestContext.setExcludeRegions(this.options.getExcludedRegions());
        }
        return create;
    }

    private Mono<FeedResponse<T>> executeRequestAsync(RxDocumentServiceRequest rxDocumentServiceRequest) {
        if (this.operationContextAndListener == null) {
            return this.client.readFeed(rxDocumentServiceRequest).map(rxDocumentServiceResponse -> {
                return feedResponseAccessor.createChangeFeedResponse(rxDocumentServiceResponse, this.itemSerializer, this.klass);
            });
        }
        OperationListener operationListener = this.operationContextAndListener.getOperationListener();
        OperationContext operationContext = this.operationContextAndListener.getOperationContext();
        rxDocumentServiceRequest.getHeaders().put(HttpConstants.HttpHeaders.CORRELATED_ACTIVITY_ID, operationContext.getCorrelationActivityId());
        operationListener.requestListener(operationContext, rxDocumentServiceRequest);
        return this.client.readFeed(rxDocumentServiceRequest).map(rxDocumentServiceResponse2 -> {
            String str;
            operationListener.responseListener(operationContext, rxDocumentServiceResponse2);
            FeedResponse<?> createChangeFeedResponse = feedResponseAccessor.createChangeFeedResponse(rxDocumentServiceResponse2, this.itemSerializer, this.klass);
            Map<String, String> responseHeaders = createChangeFeedResponse.getResponseHeaders();
            if (!responseHeaders.containsKey("x-ms-documentdb-partitionkeyrangeid") && (str = rxDocumentServiceRequest.getHeaders().get("x-ms-documentdb-partitionkeyrangeid")) != null) {
                responseHeaders.put("x-ms-documentdb-partitionkeyrangeid", str);
            }
            operationListener.feedResponseReceivedListener(operationContext, createChangeFeedResponse);
            return createChangeFeedResponse;
        }).doOnError(th -> {
            operationListener.exceptionListener(operationContext, th);
        });
    }
}
