package com.azure.cosmos.implementation.throughputControl.controller.group.global;

import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.guava25.collect.EvictingQueue;
import com.azure.cosmos.implementation.throughputControl.LinkedCancellationToken;
import com.azure.cosmos.implementation.throughputControl.config.GlobalThroughputControlGroup;
import com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/cosmos/implementation/throughputControl/controller/group/global/GlobalThroughputControlGroupController.class */
public class GlobalThroughputControlGroupController extends ThroughputGroupControllerBase {
    private static final Logger logger = LoggerFactory.getLogger(GlobalThroughputControlGroupController.class);
    private static final double INITIAL_CLIENT_THROUGHPUT_RU_SHARE = 1.0d;
    private static final double INITIAL_THROUGHPUT_USAGE = 1.0d;
    private static final int DEFAULT_THROUGHPUT_USAGE_QUEUE_SIZE = 300;
    private static final double MIN_LOAD_FACTOR = 0.1d;
    private final Duration controlItemRenewInterval;
    private final ThroughputControlContainerManager containerManager;
    private final EvictingQueue<ThroughputUsageSnapshot> throughputUsageSnapshotQueue;
    private final Object throughputUsageSnapshotQueueLock;
    private AtomicReference<Double> clientThroughputShare;

    public GlobalThroughputControlGroupController(ConnectionMode connectionMode, GlobalThroughputControlGroup globalThroughputControlGroup, Integer num, RxPartitionKeyRangeCache rxPartitionKeyRangeCache, String str, LinkedCancellationToken linkedCancellationToken) {
        super(connectionMode, globalThroughputControlGroup, num, rxPartitionKeyRangeCache, str, linkedCancellationToken);
        this.controlItemRenewInterval = globalThroughputControlGroup.getControlItemRenewInterval();
        this.containerManager = new ThroughputControlContainerManager(globalThroughputControlGroup);
        this.throughputUsageSnapshotQueue = EvictingQueue.create(DEFAULT_THROUGHPUT_USAGE_QUEUE_SIZE);
        this.throughputUsageSnapshotQueue.add(new ThroughputUsageSnapshot(1.0d));
        this.throughputUsageSnapshotQueueLock = new Object();
        this.clientThroughputShare = new AtomicReference<>(Double.valueOf(1.0d));
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.IThroughputController
    public <T> Mono<T> init() {
        return this.containerManager.validateControlContainer().flatMap(throughputControlContainerManager -> {
            return this.containerManager.getOrCreateConfigItem();
        }).flatMap(globalThroughputControlConfigItem -> {
            double calculateLoadFactor = calculateLoadFactor();
            return calculateClientThroughputShare(calculateLoadFactor).flatMap(globalThroughputControlGroupController -> {
                return this.containerManager.createGroupClientItem(calculateLoadFactor, getClientAllocatedThroughput());
            });
        }).flatMap(globalThroughputControlClientItem -> {
            return resolveRequestController();
        }).doOnSuccess(iThroughputRequestController -> {
            throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
            calculateClientThroughputShareTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe();
        }).thenReturn(this);
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase
    public double getClientAllocatedThroughput() {
        return this.groupThroughput.get().doubleValue() * this.clientThroughputShare.get().doubleValue();
    }

    @Override // com.azure.cosmos.implementation.throughputControl.controller.group.ThroughputGroupControllerBase
    public void recordThroughputUsage(double d) {
        synchronized (this.throughputUsageSnapshotQueueLock) {
            this.throughputUsageSnapshotQueue.add(new ThroughputUsageSnapshot(d));
        }
    }

    private Mono<GlobalThroughputControlGroupController> calculateClientThroughputShare(double d) {
        return this.containerManager.queryLoadFactorsOfAllClients(d).doOnSuccess(d2 -> {
            this.clientThroughputShare.set(Double.valueOf(d / d2.doubleValue()));
        }).thenReturn(this);
    }

    private double calculateLoadFactor() {
        double max;
        synchronized (this.throughputUsageSnapshotQueueLock) {
            Instant time = this.throughputUsageSnapshotQueue.peek().getTime();
            double d = 0.0d;
            Iterator<ThroughputUsageSnapshot> it = this.throughputUsageSnapshotQueue.iterator();
            while (it.hasNext()) {
                d += it.next().calculateWeight(time);
            }
            double d2 = 0.0d;
            Iterator<ThroughputUsageSnapshot> it2 = this.throughputUsageSnapshotQueue.iterator();
            while (it2.hasNext()) {
                ThroughputUsageSnapshot next = it2.next();
                d2 += (next.getWeight() / d) * next.getThroughputUsage();
            }
            max = Math.max(MIN_LOAD_FACTOR, d2);
        }
        return max;
    }

    private Flux<Void> calculateClientThroughputShareTask(LinkedCancellationToken linkedCancellationToken) {
        return Mono.delay(this.controlItemRenewInterval).flatMap(l -> {
            if (linkedCancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            double calculateLoadFactor = calculateLoadFactor();
            return calculateClientThroughputShare(calculateLoadFactor).flatMap(globalThroughputControlGroupController -> {
                return this.containerManager.replaceOrCreateGroupClientItem(calculateLoadFactor, getClientAllocatedThroughput());
            });
        }).onErrorResume(th -> {
            logger.warn("Calculate throughput task failed ", th);
            return Mono.empty();
        }).then().repeat(() -> {
            return !linkedCancellationToken.isCancellationRequested();
        });
    }
}
