package com.azure.cosmos.implementation.query;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.QueryMetricsConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.ImmutablePair;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.metrics.ClientSideMetrics;
import com.azure.cosmos.implementation.query.metrics.FetchExecutionRangeAccumulator;
import com.azure.cosmos.implementation.query.metrics.SchedulingStopwatch;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/cosmos/implementation/query/DocumentProducer.class */
public class DocumentProducer<T> {
    private static final Logger logger = LoggerFactory.getLogger(DocumentProducer.class);
    private int retries;
    protected final IDocumentQueryClient client;
    protected final Supplier<String> operationContextTextProvider;
    protected final String collectionRid;
    protected final CosmosQueryRequestOptions cosmosQueryRequestOptions;
    protected final Class<T> resourceType;
    protected PartitionKeyRange targetRange;
    protected final String collectionLink;
    protected final TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> createRequestFunc;
    protected final Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeRequestFuncWithRetries;
    protected final Callable<DocumentClientRetryPolicy> createRetryPolicyFunc;
    protected final int pageSize;
    protected final UUID correlatedActivityId;
    public int top;
    private volatile String lastResponseContinuationToken;
    private final SchedulingStopwatch fetchSchedulingMetrics = new SchedulingStopwatch();
    private final FetchExecutionRangeAccumulator fetchExecutionRangeAccumulator;
    protected FeedRangeEpkImpl feedRange;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/cosmos/implementation/query/DocumentProducer$DocumentProducerFeedResponse.class */
    public class DocumentProducerFeedResponse {
        FeedResponse<T> pageResult;
        FeedRangeEpkImpl sourceFeedRange;

        DocumentProducerFeedResponse(FeedResponse<T> feedResponse) {
            this.pageResult = feedResponse;
            this.sourceFeedRange = DocumentProducer.this.feedRange;
            populatePartitionedQueryMetrics();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DocumentProducerFeedResponse(FeedResponse<T> feedResponse, FeedRange feedRange) {
            this.pageResult = feedResponse;
            this.sourceFeedRange = (FeedRangeEpkImpl) feedRange;
            populatePartitionedQueryMetrics();
        }

        void populatePartitionedQueryMetrics() {
            String str = this.pageResult.getResponseHeaders().get("x-ms-documentdb-query-metrics");
            if (StringUtils.isEmpty(str)) {
                return;
            }
            BridgeInternal.putQueryMetricsIntoMap(this.pageResult, DocumentProducer.this.feedRange.getRange().toString() + ",pkrId:" + this.pageResult.getResponseHeaders().get("x-ms-documentdb-partitionkeyrangeid"), BridgeInternal.createQueryMetricsFromDelimitedStringAndClientSideMetrics(str + String.format(Locale.ROOT, ";%s=%.2f", QueryMetricsConstants.RequestCharge, Double.valueOf(this.pageResult.getRequestCharge())), new ClientSideMetrics(DocumentProducer.this.retries, this.pageResult.getRequestCharge(), DocumentProducer.this.fetchExecutionRangeAccumulator.getExecutionRanges(), Collections.singletonList(new ImmutablePair(DocumentProducer.this.feedRange.getRange().toString(), DocumentProducer.this.fetchSchedulingMetrics.getElapsedTime()))), this.pageResult.getActivityId(), this.pageResult.getResponseHeaders().getOrDefault("x-ms-cosmos-index-utilization", null)));
        }
    }

    public DocumentProducer(IDocumentQueryClient iDocumentQueryClient, String str, CosmosQueryRequestOptions cosmosQueryRequestOptions, TriFunction<FeedRangeEpkImpl, String, Integer, RxDocumentServiceRequest> triFunction, Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> function, PartitionKeyRange partitionKeyRange, String str2, Callable<DocumentClientRetryPolicy> callable, Class<T> cls, UUID uuid, int i, String str3, int i2, FeedRangeEpkImpl feedRangeEpkImpl, Supplier<String> supplier) {
        this.client = iDocumentQueryClient;
        this.collectionRid = str;
        this.createRequestFunc = triFunction;
        this.fetchSchedulingMetrics.ready();
        this.fetchExecutionRangeAccumulator = new FetchExecutionRangeAccumulator(feedRangeEpkImpl.getRange().toString());
        this.operationContextTextProvider = supplier;
        this.executeRequestFuncWithRetries = rxDocumentServiceRequest -> {
            this.retries = -1;
            this.fetchSchedulingMetrics.start();
            this.fetchExecutionRangeAccumulator.beginFetchRange();
            DocumentClientRetryPolicy documentClientRetryPolicy = null;
            if (callable != null) {
                try {
                    documentClientRetryPolicy = (DocumentClientRetryPolicy) callable.call();
                } catch (Exception e) {
                    return Mono.error(e);
                }
            }
            DocumentClientRetryPolicy documentClientRetryPolicy2 = documentClientRetryPolicy;
            return ObservableHelper.inlineIfPossibleAsObs(() -> {
                if (documentClientRetryPolicy2 != null) {
                    documentClientRetryPolicy2.onBeforeSendRequest(rxDocumentServiceRequest);
                }
                this.retries++;
                return (Mono) function.apply(rxDocumentServiceRequest);
            }, documentClientRetryPolicy);
        };
        this.correlatedActivityId = uuid;
        this.cosmosQueryRequestOptions = cosmosQueryRequestOptions != null ? ModelBridgeInternal.createQueryRequestOptions(cosmosQueryRequestOptions) : new CosmosQueryRequestOptions();
        ModelBridgeInternal.setQueryRequestOptionsContinuationToken(this.cosmosQueryRequestOptions, str3);
        this.lastResponseContinuationToken = str3;
        this.resourceType = cls;
        this.targetRange = partitionKeyRange;
        this.collectionLink = str2;
        this.createRetryPolicyFunc = callable;
        this.pageSize = i;
        this.top = i2;
        this.feedRange = feedRangeEpkImpl;
    }

    public Flux<DocumentProducer<T>.DocumentProducerFeedResponse> produceAsync() {
        return splitProof(Paginator.getPaginatedQueryResultAsObservable(ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(this.cosmosQueryRequestOptions), (str, num) -> {
            return this.createRequestFunc.apply(this.feedRange, str, num);
        }, this.executeRequestFuncWithRetries, this.top, this.pageSize, Paginator.getPreFetchCount(this.cosmosQueryRequestOptions, this.top, this.pageSize), ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor().getOperationContext(this.cosmosQueryRequestOptions)).map(feedResponse -> {
            this.lastResponseContinuationToken = feedResponse.m513getContinuationToken();
            this.fetchExecutionRangeAccumulator.endFetchRange(feedResponse.getActivityId(), feedResponse.getResults().size(), this.retries);
            this.fetchSchedulingMetrics.stop();
            return feedResponse;
        }).map(feedResponse2 -> {
            return new DocumentProducerFeedResponse(feedResponse2);
        }));
    }

    private Flux<DocumentProducer<T>.DocumentProducerFeedResponse> splitProof(Flux<DocumentProducer<T>.DocumentProducerFeedResponse> flux) {
        return flux.onErrorResume(th -> {
            CosmosException cosmosException = (CosmosException) Utils.as(th, CosmosException.class);
            if (cosmosException == null || !isSplit(cosmosException)) {
                logger.error("Unexpected failure, Context: {}", this.operationContextTextProvider.get(), th);
                return Flux.error(th);
            }
            logger.info("DocumentProducer handling a partition split in [{}], detail:[{}], Context: {}", new Object[]{this.feedRange, cosmosException, this.operationContextTextProvider.get()});
            return produceOnSplit(getReplacementRanges(this.feedRange.getRange()).flux().flatMap(valueHolder -> {
                if (logger.isInfoEnabled()) {
                    logger.info("Cross Partition Query Execution detected partition [{}] split into [{}] partitions, last continuation token is [{}]. - Context: {}", new Object[]{this.feedRange, ((List) valueHolder.v).stream().map((v0) -> {
                        return ModelBridgeInternal.toJsonFromJsonSerializable(v0);
                    }).collect(Collectors.joining(", ")), this.lastResponseContinuationToken, this.operationContextTextProvider.get()});
                }
                return Flux.fromIterable(createReplacingDocumentProducersOnSplit((List) valueHolder.v));
            }));
        });
    }

    protected Flux<DocumentProducer<T>.DocumentProducerFeedResponse> produceOnSplit(Flux<DocumentProducer<T>> flux) {
        return flux.flatMap((v0) -> {
            return v0.produceAsync();
        }, 1);
    }

    private List<DocumentProducer<T>> createReplacingDocumentProducersOnSplit(List<PartitionKeyRange> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<PartitionKeyRange> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(createChildDocumentProducerOnSplit(it.next(), this.lastResponseContinuationToken));
        }
        return arrayList;
    }

    protected DocumentProducer<T> createChildDocumentProducerOnSplit(PartitionKeyRange partitionKeyRange, String str) {
        return new DocumentProducer<>(this.client, this.collectionRid, this.cosmosQueryRequestOptions, this.createRequestFunc, this.executeRequestFuncWithRetries, partitionKeyRange, this.collectionLink, null, this.resourceType, this.correlatedActivityId, this.pageSize, str, this.top, new FeedRangeEpkImpl(partitionKeyRange.toRange()), this.operationContextTextProvider);
    }

    private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> getReplacementRanges(Range<String> range) {
        return this.client.getPartitionKeyRangeCache().tryGetOverlappingRangesAsync(null, this.collectionRid, range, true, ModelBridgeInternal.getPropertiesFromQueryRequestOptions(this.cosmosQueryRequestOptions));
    }

    private boolean isSplit(CosmosException cosmosException) {
        return Exceptions.isPartitionSplit(cosmosException);
    }
}
