package com.azure.cosmos.util;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.IterableStream;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.FeedResponseDiagnostics;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.SerializationDiagnosticsContext;
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ReportPayload;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.Temporal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.HdrHistogram.ConcurrentDoubleHistogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux.class */
public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, FeedResponse<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFlux.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
    private final Consumer<FeedResponse<T>> feedResponseConsumer;
    private ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor;
    private final int defaultPageSize;

    /* renamed from: com.azure.cosmos.util.CosmosPagedFlux$2, reason: invalid class name */
    /* loaded from: input_file:com/azure/cosmos/util/CosmosPagedFlux$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$reactor$core$publisher$SignalType = new int[SignalType.values().length];

        static {
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$reactor$core$publisher$SignalType[SignalType.ON_NEXT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function) {
        this(function, null, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer) {
        this(function, consumer, -1);
    }

    CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function, Consumer<FeedResponse<T>> consumer, int i) {
        this.optionsFluxFunction = function;
        this.feedResponseConsumer = consumer;
        this.cosmosDiagnosticsAccessor = ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor();
        this.defaultPageSize = i;
    }

    public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> consumer) {
        return this.feedResponseConsumer != null ? new CosmosPagedFlux<>(this.optionsFluxFunction, this.feedResponseConsumer.andThen(consumer)) : new CosmosPagedFlux<>(this.optionsFluxFunction, consumer);
    }

    public Flux<FeedResponse<T>> byPage() {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setRequestContinuation(str);
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(int i) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public Flux<FeedResponse<T>> byPage(String str, int i) {
        CosmosPagedFluxOptions createCosmosPagedFluxOptions = createCosmosPagedFluxOptions();
        createCosmosPagedFluxOptions.setRequestContinuation(str);
        createCosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(i));
        return FluxUtil.fluxContext(context -> {
            return byPage(createCosmosPagedFluxOptions, context);
        });
    }

    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        byPage().flatMap(feedResponse -> {
            IterableStream<T> elements = feedResponse.getElements();
            return elements == null ? Flux.empty() : Flux.fromIterable(elements);
        }).subscribe(coreSubscriber);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CosmosPagedFlux<T> withDefaultPageSize(int i) {
        return new CosmosPagedFlux<>(this.optionsFluxFunction, this.feedResponseConsumer, i);
    }

    private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
        CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
        if (this.defaultPageSize > 0) {
            cosmosPagedFluxOptions.setMaxItemCount(Integer.valueOf(this.defaultPageSize));
        }
        return cosmosPagedFluxOptions;
    }

    private <T> Flux<T> wrapWithTracingIfEnabled(CosmosPagedFluxOptions cosmosPagedFluxOptions, Flux<T> flux, Context context) {
        return !isTracerEnabled(cosmosPagedFluxOptions) ? flux : cosmosPagedFluxOptions.getTracerProvider().runUnderSpanInContext(flux);
    }

    private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions cosmosPagedFluxOptions, Context context) {
        AtomicReference atomicReference = new AtomicReference();
        Flux<FeedResponse<T>> doOnEach = wrapWithTracingIfEnabled(cosmosPagedFluxOptions, this.optionsFluxFunction.apply(cosmosPagedFluxOptions), context).doOnSubscribe(subscription -> {
            atomicReference.set(Instant.now());
        }).doOnEach(signal -> {
            switch (AnonymousClass2.$SwitchMap$reactor$core$publisher$SignalType[signal.getType().ordinal()]) {
                case 1:
                    if (isTracerEnabled(cosmosPagedFluxOptions)) {
                        cosmosPagedFluxOptions.getTracerProvider().endSpan(signal, 200);
                        return;
                    }
                    return;
                case 2:
                    Object throwable = signal.getThrowable();
                    if (cosmosPagedFluxOptions.getCosmosAsyncClient() != null && Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(cosmosPagedFluxOptions.getCosmosAsyncClient())) && (throwable instanceof CosmosException)) {
                        CosmosException cosmosException = (CosmosException) throwable;
                        if (this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(cosmosException.getDiagnostics()).compareAndSet(false, true)) {
                            fillClientTelemetry(cosmosPagedFluxOptions.getCosmosAsyncClient(), 0, cosmosPagedFluxOptions.getContainerId(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getOperationType(), cosmosPagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(cosmosPagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float) cosmosException.getRequestCharge(), Duration.between((Temporal) atomicReference.get(), Instant.now()));
                        }
                    }
                    if (isTracerEnabled(cosmosPagedFluxOptions)) {
                        cosmosPagedFluxOptions.getTracerProvider().endSpan(signal, 0);
                    }
                    atomicReference.set(Instant.now());
                    return;
                case 3:
                    FeedResponse<T> feedResponse = (FeedResponse) signal.get();
                    if (isTracerEnabled(cosmosPagedFluxOptions) && this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(feedResponse.getCosmosDiagnostics()).compareAndSet(false, true)) {
                        try {
                            Duration thresholdForDiagnosticsOnTracer = cosmosPagedFluxOptions.getThresholdForDiagnosticsOnTracer();
                            if (thresholdForDiagnosticsOnTracer == null) {
                                thresholdForDiagnosticsOnTracer = cosmosPagedFluxOptions.getTracerProvider().QUERY_THRESHOLD_FOR_DIAGNOSTICS;
                            }
                            if (Duration.between((Temporal) atomicReference.get(), Instant.now()).compareTo(thresholdForDiagnosticsOnTracer) > 0) {
                                addDiagnosticsOnTracerEvent(cosmosPagedFluxOptions.getTracerProvider(), feedResponse.getCosmosDiagnostics(), TracerProvider.getContextFromReactorOrNull(signal.getContextView()));
                            }
                        } catch (JsonProcessingException e) {
                            LOGGER.warn("Error while serializing diagnostics for tracer", e.getMessage());
                        }
                    }
                    if (this.feedResponseConsumer != null) {
                        this.feedResponseConsumer.accept(feedResponse);
                    }
                    if (cosmosPagedFluxOptions.getCosmosAsyncClient() != null && Configs.isClientTelemetryEnabled(BridgeInternal.isClientTelemetryEnabled(cosmosPagedFluxOptions.getCosmosAsyncClient())) && this.cosmosDiagnosticsAccessor.isDiagnosticsCapturedInPagedFlux(feedResponse.getCosmosDiagnostics()).compareAndSet(false, true)) {
                        fillClientTelemetry(cosmosPagedFluxOptions.getCosmosAsyncClient(), 200, cosmosPagedFluxOptions.getContainerId(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getOperationType(), cosmosPagedFluxOptions.getResourceType(), BridgeInternal.getContextClient(cosmosPagedFluxOptions.getCosmosAsyncClient()).getConsistencyLevel(), (float) feedResponse.getRequestCharge(), Duration.between((Temporal) atomicReference.get(), Instant.now()));
                        atomicReference.set(Instant.now());
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
        return isTracerEnabled(cosmosPagedFluxOptions) ? doOnEach.contextWrite(TracerProvider.setContextInReactor(cosmosPagedFluxOptions.getTracerProvider().startSpan(cosmosPagedFluxOptions.getTracerSpanName(), cosmosPagedFluxOptions.getDatabaseId(), cosmosPagedFluxOptions.getServiceEndpoint(), context))) : doOnEach;
    }

    private void fillClientTelemetry(CosmosAsyncClient cosmosAsyncClient, int i, String str, String str2, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, float f, Duration duration) {
        ClientTelemetry clientTelemetry = BridgeInternal.getContextClient(cosmosAsyncClient).getClientTelemetry();
        ReportPayload createReportPayload = createReportPayload(cosmosAsyncClient, i, str, str2, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_LATENCY_NAME, "MilliSecond");
        ConcurrentDoubleHistogram concurrentDoubleHistogram = clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().get(createReportPayload);
        if (concurrentDoubleHistogram != null) {
            ClientTelemetry.recordValue(concurrentDoubleHistogram, duration.toMillis());
        } else {
            ConcurrentDoubleHistogram concurrentDoubleHistogram2 = i == 200 ? new ConcurrentDoubleHistogram(300000L, 4) : new ConcurrentDoubleHistogram(300000L, 2);
            concurrentDoubleHistogram2.setAutoResize(true);
            ClientTelemetry.recordValue(concurrentDoubleHistogram2, duration.toMillis());
            clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().put(createReportPayload, concurrentDoubleHistogram2);
        }
        ReportPayload createReportPayload2 = createReportPayload(cosmosAsyncClient, i, str, str2, operationType, resourceType, consistencyLevel, ClientTelemetry.REQUEST_CHARGE_NAME, ClientTelemetry.REQUEST_CHARGE_UNIT);
        ConcurrentDoubleHistogram concurrentDoubleHistogram3 = clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().get(createReportPayload2);
        if (concurrentDoubleHistogram3 != null) {
            ClientTelemetry.recordValue(concurrentDoubleHistogram3, f);
            return;
        }
        ConcurrentDoubleHistogram concurrentDoubleHistogram4 = new ConcurrentDoubleHistogram(10000L, 2);
        concurrentDoubleHistogram4.setAutoResize(true);
        ClientTelemetry.recordValue(concurrentDoubleHistogram4, f);
        clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().put(createReportPayload2, concurrentDoubleHistogram4);
    }

    private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient, int i, String str, String str2, OperationType operationType, ResourceType resourceType, ConsistencyLevel consistencyLevel, String str3, String str4) {
        ReportPayload reportPayload = new ReportPayload(str3, str4);
        reportPayload.setConsistency(consistencyLevel == null ? BridgeInternal.getContextClient(cosmosAsyncClient).getConsistencyLevel() : consistencyLevel);
        reportPayload.setDatabaseName(str2);
        reportPayload.setContainerName(str);
        reportPayload.setOperation(operationType);
        reportPayload.setResource(resourceType);
        reportPayload.setStatusCode(Integer.valueOf(i));
        return reportPayload;
    }

    private void addDiagnosticsOnTracerEvent(TracerProvider tracerProvider, CosmosDiagnostics cosmosDiagnostics, Context context) throws JsonProcessingException {
        if (cosmosDiagnostics == null || context == null) {
            return;
        }
        HashMap hashMap = new HashMap();
        QueryInfo.QueryPlanDiagnosticsContext queryPlanDiagnosticsContext = this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics) != null ? this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics).getQueryPlanDiagnosticsContext() : null;
        if (queryPlanDiagnosticsContext != null) {
            hashMap.put("JSON", mapper.writeValueAsString(queryPlanDiagnosticsContext));
            tracerProvider.addEvent("Query Plan Statistics", hashMap, OffsetDateTime.ofInstant(queryPlanDiagnosticsContext.getStartTimeUTC(), ZoneOffset.UTC), context);
        }
        FeedResponseDiagnostics feedResponseDiagnostics = this.cosmosDiagnosticsAccessor.getFeedResponseDiagnostics(cosmosDiagnostics);
        if (feedResponseDiagnostics != null && feedResponseDiagnostics.getQueryMetricsMap() != null && feedResponseDiagnostics.getQueryMetricsMap().size() > 0) {
            for (Map.Entry<String, QueryMetrics> entry : feedResponseDiagnostics.getQueryMetricsMap().entrySet()) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put("Query Metrics", entry.getValue().toString());
                tracerProvider.addEvent("Query Metrics for PKRange " + entry.getKey(), hashMap2, OffsetDateTime.now(), context);
            }
        }
        int i = 1;
        for (ClientSideRequestStatistics clientSideRequestStatistics : BridgeInternal.getClientSideRequestStatisticsList(cosmosDiagnostics)) {
            HashMap hashMap3 = new HashMap();
            int i2 = 1;
            Iterator<ClientSideRequestStatistics.StoreResponseStatistics> it = clientSideRequestStatistics.getResponseStatisticsList().iterator();
            while (it.hasNext()) {
                int i3 = i2;
                i2++;
                hashMap3.put("StoreResponse" + i3, mapper.writeValueAsString(it.next()));
            }
            int i4 = 1;
            Iterator<ClientSideRequestStatistics.StoreResponseStatistics> it2 = ClientSideRequestStatistics.getCappedSupplementalResponseStatisticsList(clientSideRequestStatistics.getSupplementalResponseStatisticsList()).iterator();
            while (it2.hasNext()) {
                int i5 = i4;
                i4++;
                hashMap3.put("Supplemental StoreResponse" + i5, mapper.writeValueAsString(it2.next()));
            }
            if (clientSideRequestStatistics.getRetryContext().getRetryStartTime() != null) {
                hashMap3.put("Retry Context", mapper.writeValueAsString(clientSideRequestStatistics.getRetryContext()));
            }
            int i6 = 1;
            Iterator<ClientSideRequestStatistics.AddressResolutionStatistics> it3 = clientSideRequestStatistics.getAddressResolutionStatistics().values().iterator();
            while (it3.hasNext()) {
                int i7 = i6;
                i6++;
                hashMap3.put("AddressResolutionStatistics" + i7, mapper.writeValueAsString(it3.next()));
            }
            if (clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList != null) {
                int i8 = 1;
                for (SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics : clientSideRequestStatistics.getSerializationDiagnosticsContext().serializationDiagnosticsList) {
                    hashMap3 = new HashMap();
                    int i9 = i8;
                    i8++;
                    hashMap3.put("SerializationDiagnostics" + i9, mapper.writeValueAsString(serializationDiagnostics));
                }
            }
            if (clientSideRequestStatistics.getGatewayStatistics() != null) {
                hashMap3.put("GatewayStatistics", mapper.writeValueAsString(clientSideRequestStatistics.getGatewayStatistics()));
            }
            hashMap3.put("RegionContacted", mapper.writeValueAsString(clientSideRequestStatistics.getContactedRegionNames()));
            hashMap3.put("SystemInformation", mapper.writeValueAsString(ClientSideRequestStatistics.fetchSystemInformation()));
            hashMap3.put("ClientCfgs", mapper.writeValueAsString(clientSideRequestStatistics.getDiagnosticsClientConfig()));
            if (clientSideRequestStatistics.getResponseStatisticsList() != null && clientSideRequestStatistics.getResponseStatisticsList().size() > 0 && clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult() != null) {
                tracerProvider.addEvent("Diagnostics for PKRange " + clientSideRequestStatistics.getResponseStatisticsList().get(0).getStoreResult().partitionKeyRangeId, hashMap3, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);
            } else if (clientSideRequestStatistics.getGatewayStatistics() != null) {
                tracerProvider.addEvent("Diagnostics for PKRange " + clientSideRequestStatistics.getGatewayStatistics().getPartitionKeyRangeId(), hashMap3, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);
            } else {
                int i10 = i;
                i++;
                tracerProvider.addEvent("Diagnostics " + i10, hashMap3, OffsetDateTime.ofInstant(clientSideRequestStatistics.getRequestStartTimeUTC(), ZoneOffset.UTC), context);
            }
        }
    }

    private boolean isTracerEnabled(CosmosPagedFluxOptions cosmosPagedFluxOptions) {
        return cosmosPagedFluxOptions.getTracerProvider() != null && cosmosPagedFluxOptions.getTracerProvider().isEnabled();
    }

    static {
        ImplementationBridgeHelpers.CosmosPageFluxHelper.setCosmosPageFluxAccessor(new ImplementationBridgeHelpers.CosmosPageFluxHelper.CosmosPageFluxAccessor() { // from class: com.azure.cosmos.util.CosmosPagedFlux.1
            @Override // com.azure.cosmos.implementation.ImplementationBridgeHelpers.CosmosPageFluxHelper.CosmosPageFluxAccessor
            public <T> CosmosPagedFlux<T> getCosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> function) {
                return new CosmosPagedFlux<>(function);
            }
        });
    }
}
