/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalBufferPool
implements BufferPool {
    private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);
    private static final int UNKNOWN_CHANNEL = -1;
    private final NetworkBufferPool networkBufferPool;
    private final int numberOfRequiredMemorySegments;
    private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque();
    private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque();
    private final int maxNumberOfMemorySegments;
    @GuardedBy(value="availableMemorySegments")
    private int currentPoolSize;
    @GuardedBy(value="availableMemorySegments")
    private int numberOfRequestedMemorySegments;
    private final int maxBuffersPerChannel;
    @GuardedBy(value="availableMemorySegments")
    private final int[] subpartitionBuffersCount;
    private final BufferRecycler[] subpartitionBufferRecyclers;
    @GuardedBy(value="availableMemorySegments")
    private int unavailableSubpartitionsCount = 0;
    private int maxOverdraftBuffersPerGate;
    @GuardedBy(value="availableMemorySegments")
    private boolean isDestroyed;
    @GuardedBy(value="availableMemorySegments")
    private final AvailabilityProvider.AvailabilityHelper availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
    @GuardedBy(value="availableMemorySegments")
    private boolean requestingNotificationOfGlobalPoolAvailable;

    LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
        this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE, 0, Integer.MAX_VALUE, 0);
    }

    LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments, int maxNumberOfMemorySegments) {
        this(networkBufferPool, numberOfRequiredMemorySegments, maxNumberOfMemorySegments, 0, Integer.MAX_VALUE, 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments, int maxNumberOfMemorySegments, int numberOfSubpartitions, int maxBuffersPerChannel, int maxOverdraftBuffersPerGate) {
        Preconditions.checkArgument(numberOfRequiredMemorySegments > 0, "Required number of memory segments (%s) should be larger than 0.", numberOfRequiredMemorySegments);
        Preconditions.checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments, "Maximum number of memory segments (%s) should not be smaller than minimum (%s).", maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
        LOG.debug("Using a local buffer pool with {}-{} buffers", (Object)numberOfRequiredMemorySegments, (Object)maxNumberOfMemorySegments);
        this.networkBufferPool = networkBufferPool;
        this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
        this.currentPoolSize = numberOfRequiredMemorySegments;
        this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
        if (numberOfSubpartitions > 0) {
            Preconditions.checkArgument(maxBuffersPerChannel > 0, "Maximum number of buffers for each channel (%s) should be larger than 0.", maxBuffersPerChannel);
            Preconditions.checkArgument(maxOverdraftBuffersPerGate >= 0, "Maximum number of overdraft buffers for each gate (%s) should not be less than 0.", maxOverdraftBuffersPerGate);
        }
        this.subpartitionBuffersCount = new int[numberOfSubpartitions];
        this.subpartitionBufferRecyclers = new BufferRecycler[numberOfSubpartitions];
        for (int i = 0; i < this.subpartitionBufferRecyclers.length; ++i) {
            this.subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this);
        }
        this.maxBuffersPerChannel = maxBuffersPerChannel;
        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            this.checkAndUpdateAvailability();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reserveSegments(int numberOfSegmentsToReserve) throws IOException {
        Preconditions.checkArgument(numberOfSegmentsToReserve <= this.numberOfRequiredMemorySegments, "Can not reserve more segments than number of required segments.");
        CompletableFuture<?> toNotify = null;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            this.checkDestroyed();
            if (this.numberOfRequestedMemorySegments < numberOfSegmentsToReserve) {
                this.availableMemorySegments.addAll(this.networkBufferPool.requestPooledMemorySegmentsBlocking(numberOfSegmentsToReserve - this.numberOfRequestedMemorySegments));
                toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
            }
        }
        this.mayNotifyAvailable(toNotify);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isDestroyed() {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            return this.isDestroyed;
        }
    }

    @Override
    public int getNumberOfRequiredMemorySegments() {
        return this.numberOfRequiredMemorySegments;
    }

    @Override
    public int getMaxNumberOfMemorySegments() {
        return this.maxNumberOfMemorySegments;
    }

    public int getEstimatedNumberOfRequestedMemorySegments() {
        if (this.maxNumberOfMemorySegments < Integer.MAX_VALUE) {
            return this.maxNumberOfMemorySegments;
        }
        return this.getNumberOfRequiredMemorySegments() * 2;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public int getNumberOfRequestedMemorySegments() {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            return this.numberOfRequestedMemorySegments;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumberOfAvailableMemorySegments() {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            return this.availableMemorySegments.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumBuffers() {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            return this.currentPoolSize;
        }
    }

    @Override
    public int bestEffortGetNumOfUsedBuffers() {
        return Math.max(0, this.numberOfRequestedMemorySegments - this.availableMemorySegments.size());
    }

    @Override
    public Buffer requestBuffer() {
        return this.toBuffer(this.requestMemorySegment());
    }

    @Override
    public BufferBuilder requestBufferBuilder() {
        return this.toBufferBuilder(this.requestMemorySegment(-1), -1);
    }

    @Override
    public BufferBuilder requestBufferBuilder(int targetChannel) {
        return this.toBufferBuilder(this.requestMemorySegment(targetChannel), targetChannel);
    }

    @Override
    public BufferBuilder requestBufferBuilderBlocking() throws InterruptedException {
        return this.toBufferBuilder(this.requestMemorySegmentBlocking(), -1);
    }

    @Override
    public MemorySegment requestMemorySegmentBlocking() throws InterruptedException {
        return this.requestMemorySegmentBlocking(-1);
    }

    @Override
    public BufferBuilder requestBufferBuilderBlocking(int targetChannel) throws InterruptedException {
        return this.toBufferBuilder(this.requestMemorySegmentBlocking(targetChannel), targetChannel);
    }

    private Buffer toBuffer(MemorySegment memorySegment) {
        if (memorySegment == null) {
            return null;
        }
        return new NetworkBuffer(memorySegment, this);
    }

    private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
        if (memorySegment == null) {
            return null;
        }
        if (targetChannel == -1) {
            return new BufferBuilder(memorySegment, this);
        }
        return new BufferBuilder(memorySegment, this.subpartitionBufferRecyclers[targetChannel]);
    }

    private MemorySegment requestMemorySegmentBlocking(int targetChannel) throws InterruptedException {
        MemorySegment segment;
        while ((segment = this.requestMemorySegment(targetChannel)) == null) {
            try {
                this.getAvailableFuture().get();
            }
            catch (ExecutionException e) {
                LOG.error("The available future is completed exceptionally.", (Throwable)e);
                ExceptionUtils.rethrow(e);
            }
        }
        return segment;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private MemorySegment requestMemorySegment(int targetChannel) {
        MemorySegment segment = null;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            this.checkDestroyed();
            if (!this.availableMemorySegments.isEmpty()) {
                segment = this.availableMemorySegments.poll();
            } else if (this.isRequestedSizeReached()) {
                segment = this.requestOverdraftMemorySegmentFromGlobal();
            }
            if (segment == null) {
                return null;
            }
            if (targetChannel != -1) {
                int n = targetChannel;
                this.subpartitionBuffersCount[n] = this.subpartitionBuffersCount[n] + 1;
                if (this.subpartitionBuffersCount[n] == this.maxBuffersPerChannel) {
                    ++this.unavailableSubpartitionsCount;
                }
            }
            this.checkAndUpdateAvailability();
        }
        return segment;
    }

    @GuardedBy(value="availableMemorySegments")
    private void checkDestroyed() {
        if (this.isDestroyed) {
            throw new CancelTaskException("Buffer pool has already been destroyed.");
        }
    }

    @Override
    public MemorySegment requestMemorySegment() {
        return this.requestMemorySegment(-1);
    }

    @GuardedBy(value="availableMemorySegments")
    private boolean requestMemorySegmentFromGlobal() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        if (this.isRequestedSizeReached()) {
            return false;
        }
        MemorySegment segment = this.requestPooledMemorySegment();
        if (segment != null) {
            this.availableMemorySegments.add(segment);
            return true;
        }
        return false;
    }

    @GuardedBy(value="availableMemorySegments")
    private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        if (this.numberOfRequestedMemorySegments - this.currentPoolSize >= this.maxOverdraftBuffersPerGate) {
            return null;
        }
        return this.requestPooledMemorySegment();
    }

    @Nullable
    @GuardedBy(value="availableMemorySegments")
    private MemorySegment requestPooledMemorySegment() {
        Preconditions.checkState(!this.isDestroyed, "Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");
        MemorySegment segment = this.networkBufferPool.requestPooledMemorySegment();
        if (segment != null) {
            ++this.numberOfRequestedMemorySegments;
        }
        return segment;
    }

    @GuardedBy(value="availableMemorySegments")
    private void requestMemorySegmentFromGlobalWhenAvailable() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        Preconditions.checkState(!this.requestingNotificationOfGlobalPoolAvailable, "local buffer pool is already in the state of requesting memory segment from global when it is available.");
        this.requestingNotificationOfGlobalPoolAvailable = true;
        FutureUtils.assertNoException(this.networkBufferPool.getAvailableFuture().thenRun(this::onGlobalPoolAvailable));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onGlobalPoolAvailable() {
        CompletableFuture<?> toNotify;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            this.requestingNotificationOfGlobalPoolAvailable = false;
            if (this.isDestroyed || this.availabilityHelper.isApproximatelyAvailable()) {
                return;
            }
            toNotify = this.checkAndUpdateAvailability();
        }
        this.mayNotifyAvailable(toNotify);
    }

    @GuardedBy(value="availableMemorySegments")
    private boolean shouldBeAvailable() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        return !this.availableMemorySegments.isEmpty() && this.unavailableSubpartitionsCount == 0;
    }

    @GuardedBy(value="availableMemorySegments")
    private CompletableFuture<?> checkAndUpdateAvailability() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        CompletableFuture<?> toNotify = null;
        AvailabilityStatus availabilityStatus = this.checkAvailability();
        if (availabilityStatus.isAvailable()) {
            toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
        } else {
            this.availabilityHelper.resetUnavailable();
        }
        if (availabilityStatus.isNeedRequestingNotificationOfGlobalPoolAvailable()) {
            this.requestMemorySegmentFromGlobalWhenAvailable();
        }
        this.checkConsistentAvailability();
        return toNotify;
    }

    @GuardedBy(value="availableMemorySegments")
    private AvailabilityStatus checkAvailability() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        if (!this.availableMemorySegments.isEmpty()) {
            return AvailabilityStatus.from(this.shouldBeAvailable(), false);
        }
        if (this.isRequestedSizeReached()) {
            return AvailabilityStatus.UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
        }
        boolean needRequestingNotificationOfGlobalPoolAvailable = false;
        if (!this.requestMemorySegmentFromGlobal()) {
            needRequestingNotificationOfGlobalPoolAvailable = !this.requestingNotificationOfGlobalPoolAvailable;
        }
        return AvailabilityStatus.from(this.shouldBeAvailable(), needRequestingNotificationOfGlobalPoolAvailable);
    }

    @GuardedBy(value="availableMemorySegments")
    private void checkConsistentAvailability() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        boolean shouldBeAvailable = this.shouldBeAvailable();
        Preconditions.checkState(this.availabilityHelper.isApproximatelyAvailable() == shouldBeAvailable, "Inconsistent availability: expected " + shouldBeAvailable);
    }

    @Override
    public void recycle(MemorySegment segment) {
        this.recycle(segment, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recycle(MemorySegment segment, int channel) {
        BufferListener listener;
        CompletableFuture<?> toNotify = null;
        do {
            ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
            synchronized (arrayDeque) {
                if (channel != -1) {
                    int n = channel;
                    int n2 = this.subpartitionBuffersCount[n];
                    this.subpartitionBuffersCount[n] = n2 - 1;
                    if (n2 == this.maxBuffersPerChannel) {
                        --this.unavailableSubpartitionsCount;
                    }
                }
                if (this.isDestroyed || this.hasExcessBuffers()) {
                    this.returnMemorySegment(segment);
                    return;
                }
                listener = this.registeredListeners.poll();
                if (listener == null) {
                    this.availableMemorySegments.add(segment);
                    if (!this.availabilityHelper.isApproximatelyAvailable() && this.shouldBeAvailable()) {
                        toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
                    }
                    break;
                }
                this.checkConsistentAvailability();
            }
        } while (!this.fireBufferAvailableNotification(listener, segment));
        this.mayNotifyAvailable(toNotify);
    }

    private boolean fireBufferAvailableNotification(BufferListener listener, MemorySegment segment) {
        return listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void lazyDestroy() {
        CompletableFuture<?> toNotify = null;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            if (!this.isDestroyed) {
                BufferListener listener;
                MemorySegment segment;
                while ((segment = this.availableMemorySegments.poll()) != null) {
                    this.returnMemorySegment(segment);
                }
                while ((listener = this.registeredListeners.poll()) != null) {
                    listener.notifyBufferDestroyed();
                }
                if (!this.isAvailable()) {
                    toNotify = this.availabilityHelper.getAvailableFuture();
                }
                this.isDestroyed = true;
            }
        }
        this.mayNotifyAvailable(toNotify);
        this.networkBufferPool.destroyBufferPool(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean addBufferListener(BufferListener listener) {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            if (!this.availableMemorySegments.isEmpty() || this.isDestroyed) {
                return false;
            }
            this.registeredListeners.add(listener);
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setNumBuffers(int numBuffers) {
        CompletableFuture<?> toNotify;
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            Preconditions.checkArgument(numBuffers >= this.numberOfRequiredMemorySegments, "Buffer pool needs at least %s buffers, but tried to set to %s", this.numberOfRequiredMemorySegments, numBuffers);
            this.currentPoolSize = Math.min(numBuffers, this.maxNumberOfMemorySegments);
            this.returnExcessMemorySegments();
            if (this.isDestroyed) {
                return;
            }
            toNotify = this.checkAndUpdateAvailability();
        }
        this.mayNotifyAvailable(toNotify);
    }

    @Override
    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
    }

    @Override
    public int getMaxOverdraftBuffersPerGate() {
        return this.maxOverdraftBuffersPerGate;
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return this.availabilityHelper.getAvailableFuture();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        ArrayDeque<MemorySegment> arrayDeque = this.availableMemorySegments;
        synchronized (arrayDeque) {
            return String.format("[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d,subpartitions: %d, maxBuffersPerChannel: %d, destroyed: %s]", this.currentPoolSize, this.numberOfRequiredMemorySegments, this.numberOfRequestedMemorySegments, this.availableMemorySegments.size(), this.maxNumberOfMemorySegments, this.registeredListeners.size(), this.subpartitionBuffersCount.length, this.maxBuffersPerChannel, this.isDestroyed);
        }
    }

    private void mayNotifyAvailable(@Nullable CompletableFuture<?> toNotify) {
        if (toNotify != null) {
            toNotify.complete(null);
        }
    }

    @GuardedBy(value="availableMemorySegments")
    private void returnMemorySegment(MemorySegment segment) {
        assert (Thread.holdsLock(this.availableMemorySegments));
        --this.numberOfRequestedMemorySegments;
        this.networkBufferPool.recyclePooledMemorySegment(segment);
    }

    @GuardedBy(value="availableMemorySegments")
    private void returnExcessMemorySegments() {
        assert (Thread.holdsLock(this.availableMemorySegments));
        while (this.hasExcessBuffers()) {
            MemorySegment segment = this.availableMemorySegments.poll();
            if (segment == null) {
                return;
            }
            this.returnMemorySegment(segment);
        }
    }

    @GuardedBy(value="availableMemorySegments")
    private boolean hasExcessBuffers() {
        return this.numberOfRequestedMemorySegments > this.currentPoolSize;
    }

    @GuardedBy(value="availableMemorySegments")
    private boolean isRequestedSizeReached() {
        return this.numberOfRequestedMemorySegments >= this.currentPoolSize;
    }

    private static enum AvailabilityStatus {
        AVAILABLE(true, false),
        UNAVAILABLE_NEED_REQUESTING_NOTIFICATION(false, true),
        UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION(false, false);

        private final boolean available;
        private final boolean needRequestingNotificationOfGlobalPoolAvailable;

        private AvailabilityStatus(boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
            this.available = available;
            this.needRequestingNotificationOfGlobalPoolAvailable = needRequestingNotificationOfGlobalPoolAvailable;
        }

        public boolean isAvailable() {
            return this.available;
        }

        public boolean isNeedRequestingNotificationOfGlobalPoolAvailable() {
            return this.needRequestingNotificationOfGlobalPoolAvailable;
        }

        public static AvailabilityStatus from(boolean available, boolean needRequestingNotificationOfGlobalPoolAvailable) {
            if (available) {
                Preconditions.checkState(!needRequestingNotificationOfGlobalPoolAvailable, "available local buffer pool should not request from global.");
                return AVAILABLE;
            }
            if (needRequestingNotificationOfGlobalPoolAvailable) {
                return UNAVAILABLE_NEED_REQUESTING_NOTIFICATION;
            }
            return UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION;
        }
    }

    private static class SubpartitionBufferRecycler
    implements BufferRecycler {
        private final int channel;
        private final LocalBufferPool bufferPool;

        SubpartitionBufferRecycler(int channel, LocalBufferPool bufferPool) {
            this.channel = channel;
            this.bufferPool = bufferPool;
        }

        @Override
        public void recycle(MemorySegment memorySegment) {
            this.bufferPool.recycle(memorySegment, this.channel);
        }
    }
}

