/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.throughput;

import java.time.Duration;
import java.util.OptionalInt;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.throughput.BufferSizeEMA;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferDebloater {
    private static final Logger LOG = LoggerFactory.getLogger(BufferDebloater.class);
    private static final long MILLIS_IN_SECOND = 1000L;
    private final String owningTaskName;
    private final int gateIndex;
    private final long targetTotalTime;
    private final int maxBufferSize;
    private final int minBufferSize;
    private final double bufferDebloatThresholdFactor;
    private final BufferSizeEMA bufferSizeEMA;
    private Duration lastEstimatedTimeToConsumeBuffers = Duration.ZERO;
    private int lastBufferSize;

    public BufferDebloater(String owningTaskName, int gateIndex, long targetTotalTime, int maxBufferSize, int minBufferSize, int bufferDebloatThresholdPercentages, long numberOfSamples) {
        this.owningTaskName = owningTaskName;
        this.gateIndex = gateIndex;
        this.targetTotalTime = targetTotalTime;
        this.maxBufferSize = maxBufferSize;
        this.minBufferSize = minBufferSize;
        this.bufferDebloatThresholdFactor = (double)bufferDebloatThresholdPercentages / 100.0;
        this.lastBufferSize = maxBufferSize;
        this.bufferSizeEMA = new BufferSizeEMA(maxBufferSize, minBufferSize, numberOfSamples);
        LOG.debug("{}: Buffer debloater init settings: gateIndex={}, targetTotalTime={}, maxBufferSize={}, minBufferSize={}, bufferDebloatThresholdPercentages={}, numberOfSamples={}", new Object[]{owningTaskName, gateIndex, targetTotalTime, maxBufferSize, minBufferSize, bufferDebloatThresholdPercentages, numberOfSamples});
    }

    public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUse) {
        int actualBuffersInUse = Math.max(1, buffersInUse);
        long desiredTotalBufferSizeInBytes = currentThroughput * this.targetTotalTime / 1000L;
        int newSize = this.bufferSizeEMA.calculateBufferSize(desiredTotalBufferSizeInBytes, actualBuffersInUse);
        this.lastEstimatedTimeToConsumeBuffers = Duration.ofMillis((long)(newSize * actualBuffersInUse) * 1000L / Math.max(1L, currentThroughput));
        boolean skipUpdate = this.skipUpdate(newSize);
        LOG.debug("{}: Buffer size recalculation: gateIndex={}, currentSize={}, newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, estimatedTimeToConsumeBuffers={}, announceNewSize={}", new Object[]{this.owningTaskName, this.gateIndex, this.lastBufferSize, newSize, currentThroughput, desiredTotalBufferSizeInBytes, buffersInUse, this.lastEstimatedTimeToConsumeBuffers, !skipUpdate});
        if (skipUpdate) {
            return OptionalInt.empty();
        }
        this.lastBufferSize = newSize;
        return OptionalInt.of(newSize);
    }

    @VisibleForTesting
    boolean skipUpdate(int newSize) {
        if (newSize == this.lastBufferSize) {
            return true;
        }
        if (newSize <= this.minBufferSize || newSize >= this.maxBufferSize) {
            return false;
        }
        int delta = (int)((double)this.lastBufferSize * this.bufferDebloatThresholdFactor);
        return Math.abs(newSize - this.lastBufferSize) < delta;
    }

    public int getLastBufferSize() {
        return this.lastBufferSize;
    }

    public Duration getLastEstimatedTimeToConsumeBuffers() {
        return this.lastEstimatedTimeToConsumeBuffers;
    }
}

