package com.azure.cosmos.implementation.throughputControl.controller.group;

import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.AsyncCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationTokenSource;
import com.azure.cosmos.implementation.throughputControl.config.ThroughputControlGroupInternal;
import com.azure.cosmos.implementation.throughputControl.controller.IThroughputController;
import com.azure.cosmos.implementation.throughputControl.controller.request.GlobalThroughputRequestController;
import com.azure.cosmos.implementation.throughputControl.controller.request.IThroughputRequestController;
import com.azure.cosmos.implementation.throughputControl.controller.request.PkRangesThroughputRequestController;
import com.azure.cosmos.implementation.throughputControl.exceptions.ThroughputControlInitializationException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.class */
public abstract class ThroughputGroupControllerBase implements IThroughputController {
    private static final Logger logger = LoggerFactory.getLogger(ThroughputGroupControllerBase.class);
    private final Duration DEFAULT_THROUGHPUT_USAGE_RESET_DURATION = Duration.ofSeconds(1);
    private final ConnectionMode connectionMode;
    private final ThroughputControlGroupInternal group;
    private final AtomicInteger maxContainerThroughput;
    private final RxPartitionKeyRangeCache partitionKeyRangeCache;
    private final AsyncCache<String, IThroughputRequestController> requestControllerAsyncCache;
    private final String targetContainerRid;
    protected final AtomicReference<Double> groupThroughput;
    protected final LinkedCancellationTokenSource cancellationTokenSource;

    public ThroughputGroupControllerBase(ConnectionMode connectionMode, ThroughputControlGroupInternal throughputControlGroupInternal, Integer num, RxPartitionKeyRangeCache rxPartitionKeyRangeCache, String str, LinkedCancellationToken linkedCancellationToken) {
        Preconditions.checkNotNull(throughputControlGroupInternal, "Throughput control group can not be null");
        Preconditions.checkNotNull(rxPartitionKeyRangeCache, "Partition key range cache can not be null or empty");
        Preconditions.checkArgument(StringUtils.isNotEmpty(str), "Target container rid cannot be null nor empty");
        this.connectionMode = connectionMode;
        this.group = throughputControlGroupInternal;
        if (this.group.getTargetThroughputThreshold() != null) {
            Preconditions.checkNotNull(num, "Max container throughput can not be null when target throughput threshold defined");
            this.maxContainerThroughput = new AtomicInteger(num.intValue());
        } else {
            this.maxContainerThroughput = null;
        }
        this.groupThroughput = new AtomicReference<>();
        calculateGroupThroughput();
        this.partitionKeyRangeCache = rxPartitionKeyRangeCache;
        this.requestControllerAsyncCache = new AsyncCache<>();
        this.targetContainerRid = str;
        this.cancellationTokenSource = new LinkedCancellationTokenSource(linkedCancellationToken);
    }

    public abstract double getClientAllocatedThroughput();

    public abstract void recordThroughputUsage(double d);

    protected void calculateGroupThroughput() {
        double d = Double.MAX_VALUE;
        if (this.group.getTargetThroughputThreshold() != null) {
            d = Math.min(Double.MAX_VALUE, this.maxContainerThroughput.get() * this.group.getTargetThroughputThreshold().doubleValue());
        }
        if (this.group.getTargetThroughput() != null) {
            d = Math.min(d, this.group.getTargetThroughput().intValue());
        }
        this.groupThroughput.set(Double.valueOf(d));
    }

    public Flux<Void> throughputUsageCycleRenewTask(LinkedCancellationToken linkedCancellationToken) {
        Preconditions.checkNotNull(linkedCancellationToken, "Cancellation token can not be null");
        return Mono.delay(this.DEFAULT_THROUGHPUT_USAGE_RESET_DURATION, CosmosSchedulers.COSMOS_PARALLEL).flatMap(l -> {
            return linkedCancellationToken.isCancellationRequested() ? Mono.empty() : resolveRequestController();
        }).doOnSuccess(iThroughputRequestController -> {
            if (iThroughputRequestController != null) {
                recordThroughputUsage(iThroughputRequestController.renewThroughputUsageCycle(getClientAllocatedThroughput()));
            }
        }).onErrorResume(th -> {
            logger.warn("Reset throughput usage failed with reason ", th);
            return Mono.empty();
        }).then().repeat(() -> {
            return !linkedCancellationToken.isCancellationRequested();
        });
    }

    private Mono<IThroughputRequestController> createAndInitializeRequestController() {
        IThroughputRequestController globalThroughputRequestController;
        if (this.connectionMode == ConnectionMode.DIRECT) {
            globalThroughputRequestController = new PkRangesThroughputRequestController(this.partitionKeyRangeCache, this.targetContainerRid, getClientAllocatedThroughput());
        } else {
            if (this.connectionMode != ConnectionMode.GATEWAY) {
                throw new IllegalArgumentException(String.format("Connection mode %s is not supported", this.connectionMode));
            }
            globalThroughputRequestController = new GlobalThroughputRequestController(getClientAllocatedThroughput());
        }
        return globalThroughputRequestController.init();
    }

    public boolean isDefault() {
        return this.group.isDefault();
    }

    public void onContainerMaxThroughputRefresh(int i) {
        if (this.maxContainerThroughput.getAndSet(i) != i) {
            calculateGroupThroughput();
        }
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public <T> Mono<T> processRequest(RxDocumentServiceRequest rxDocumentServiceRequest, Mono<T> mono) {
        return resolveRequestController().flatMap(iThroughputRequestController -> {
            return iThroughputRequestController.canHandleRequest(rxDocumentServiceRequest) ? iThroughputRequestController.processRequest(rxDocumentServiceRequest, mono).doOnError(th -> {
                handleException(th);
            }) : updateControllerAndRetry(rxDocumentServiceRequest, mono);
        });
    }

    private <T> Mono<T> updateControllerAndRetry(RxDocumentServiceRequest rxDocumentServiceRequest, Mono<T> mono) {
        return shouldUpdateRequestController(rxDocumentServiceRequest).flatMap(bool -> {
            if (!bool.booleanValue()) {
                return mono;
            }
            refreshRequestController();
            return resolveRequestController().flatMap(iThroughputRequestController -> {
                if (iThroughputRequestController.canHandleRequest(rxDocumentServiceRequest)) {
                    return iThroughputRequestController.processRequest(rxDocumentServiceRequest, mono).doOnError(th -> {
                        handleException(th);
                    });
                }
                logger.warn("Can not find request controller to handle request {} with pkRangeId {}", rxDocumentServiceRequest.getActivityId(), getResolvedPartitionKeyRangeId(rxDocumentServiceRequest));
                return mono;
            });
        });
    }

    private String getResolvedPartitionKeyRangeId(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return (rxDocumentServiceRequest.requestContext == null || rxDocumentServiceRequest.requestContext.resolvedPartitionKeyRange == null) ? "" : rxDocumentServiceRequest.requestContext.resolvedPartitionKeyRange.getId();
    }

    private Mono<Boolean> shouldUpdateRequestController(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return this.partitionKeyRangeCache.tryGetRangeByPartitionKeyRangeId(null, rxDocumentServiceRequest.requestContext.resolvedCollectionRid, rxDocumentServiceRequest.requestContext.resolvedPartitionKeyRange.getId(), null).flatMap(valueHolder -> {
            return valueHolder.v == 0 ? Mono.just(Boolean.FALSE) : Mono.just(Boolean.TRUE);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<IThroughputRequestController> resolveRequestController() {
        return this.requestControllerAsyncCache.getAsync(this.group.getGroupName(), null, () -> {
            return createAndInitializeRequestController();
        }).onErrorResume(th -> {
            return Mono.error(new ThroughputControlInitializationException(th));
        });
    }

    private void refreshRequestController() {
        this.requestControllerAsyncCache.refresh(this.group.getGroupName(), () -> {
            return createAndInitializeRequestController();
        });
    }

    private void handleException(Throwable th) {
        Preconditions.checkNotNull(th, "Throwable can not be null");
        CosmosException cosmosException = (CosmosException) Utils.as(Exceptions.unwrap(th), CosmosException.class);
        if (com.azure.cosmos.implementation.Exceptions.isPartitionSplit(cosmosException) || com.azure.cosmos.implementation.Exceptions.isPartitionCompletingSplittingException(cosmosException)) {
            refreshRequestController();
        }
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public boolean canHandleRequest(RxDocumentServiceRequest rxDocumentServiceRequest) {
        return isDefault() || StringUtils.equals(this.group.getGroupName(), rxDocumentServiceRequest.getThroughputControlGroupName());
    }
}
