package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.clientTelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clientTelemetry.ReportPayload;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.util.Beta;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Signal;

/* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux.class */
public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = null;
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = consumer;
    }

    @Beta(value = Beta.SinceVersion.V4_6_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> consumer) {
        return new CosmosPagedFlux<>(this.optionsFluxFunction, consumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(str);
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(int i) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str, int i) {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        cosmosPagedFluxOptions.setRequestContinuation(str);
        cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(cosmosPagedFluxOptions, context);
        });
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        byPage().flatMap(feedResponse -> {
            IterableStream<T> elements = feedResponse.getElements();
            return elements == null ? Flux.empty() : Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions cosmosPagedFluxOptions, Context context) {
        AtomicReference atomicReference = new AtomicReference(Context.NONE);
        AtomicReference atomicReference2 = new AtomicReference();
        return this.optionsFluxFunction.apply(cosmosPagedFluxOptions).doOnSubscribe(subscription -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                atomicReference.set(cosmosPagedFluxOptions.getTracerProvider().startSpan(cosmosPagedFluxOptions.getTracerSpanName(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getServiceEndpoint(), context));
            }
            atomicReference2.set(Instant.now());
        }).doOnComplete(() -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                cosmosPagedFluxOptions.getTracerProvider().endSpan((Context) atomicReference.get(), Signal.complete(), 200);
            }
        }).doOnError(th -> {
            if (cosmosPagedFluxOptions.getTracerProvider().isEnabled()) {
                cosmosPagedFluxOptions.getTracerProvider().endSpan((Context) atomicReference.get(), Signal.error(th), 0);
            }
            if (cosmosPagedFluxOptions.getCosmosAsyncClient() != null && Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(cosmosPagedFluxOptions.getCosmosAsyncClient())) && (th instanceof CosmosException)) {
                fillClientTelemetry(cosmosPagedFluxOptions.getCosmosAsyncClient(), 0, cosmosPagedFluxOptions.getContainerId(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getOperationType(), cosmosPagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(cosmosPagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float) ((CosmosException) th).getRequestCharge(), Duration.between((Temporal) atomicReference2.get(), Instant.now()));
            }
            atomicReference2.set(Instant.now());
        }).doOnNext(feedResponse -> {
            if (this.feedResponseConsumer != null) {
                this.feedResponseConsumer.accept(feedResponse);
            }
            if (cosmosPagedFluxOptions.getCosmosAsyncClient() == null || !Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(cosmosPagedFluxOptions.getCosmosAsyncClient()))) {
                return;
            }
            fillClientTelemetry(cosmosPagedFluxOptions.getCosmosAsyncClient(), 200, cosmosPagedFluxOptions.getContainerId(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getOperationType(), cosmosPagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(cosmosPagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float) feedResponse.getRequestCharge(), Duration.between((Temporal) atomicReference2.get(), Instant.now()));
            atomicReference2.set(Instant.now());
        });
    }

    private void fillClientTelemetry(CosmosAsyncClient cosmosAsyncClient, int i, String str, String str2, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, float f, Duration duration) {
        ClientTelemetry clientTelemetry = BridgeInternal.getContextClient(cosmosAsyncClient).getClientTelemetry();
        ReportPayload createReportPayload = createReportPayload(cosmosAsyncClient, i, str, str2, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_LATENCY_NAME, ClientTelemetry.REQUEST_LATENCY_UNIT);
        ConcurrentDoubleHistogram concurrentDoubleHistogram = clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().get(createReportPayload);
        if (concurrentDoubleHistogram != null) {
            ClientTelemetry.recordValue(concurrentDoubleHistogram, duration.toNanos() / 1000);
        } else {
            ConcurrentDoubleHistogram concurrentDoubleHistogram2 = i == 200 ? new ConcurrentDoubleHistogram(300000000L, 4) : new ConcurrentDoubleHistogram(300000000L, 2);
            concurrentDoubleHistogram2.setAutoResize(true);
            ClientTelemetry.recordValue(concurrentDoubleHistogram2, duration.toNanos() / 1000);
            clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().put(createReportPayload, concurrentDoubleHistogram2);
        }
        ReportPayload createReportPayload2 = createReportPayload(cosmosAsyncClient, i, str, str2, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_CHARGE_NAME, ClientTelemetry.REQUEST_CHARGE_UNIT);
        ConcurrentDoubleHistogram concurrentDoubleHistogram3 = clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().get(createReportPayload2);
        if (concurrentDoubleHistogram3 != null) {
            ClientTelemetry.recordValue(concurrentDoubleHistogram3, f);
            return;
        }
        ConcurrentDoubleHistogram concurrentDoubleHistogram4 = new ConcurrentDoubleHistogram(10000L, 2);
        concurrentDoubleHistogram4.setAutoResize(true);
        ClientTelemetry.recordValue(concurrentDoubleHistogram4, f);
        clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().put(createReportPayload2, concurrentDoubleHistogram4);
    }

    private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient, int i, String str, String str2, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, String str3, String str4) {
        ReportPayload reportPayload = new ReportPayload(str3, str4);
        reportPayload.setConsistency(consistencyLevel == null ? BridgeInternal.getContextClient(cosmosAsyncClient).getConsistencyLevel() : consistencyLevel);
        reportPayload.setDatabaseName(str2);
        reportPayload.setContainerName(str);
        reportPayload.setOperation(operationType);
        reportPayload.setResource(resourceType);
        reportPayload.setStatusCode(Integer.valueOf(i));
        return reportPayload;
    }
}
