/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.buffer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class LocalBufferPoolTest
extends TestLogger {
    private static final int numBuffers = 1024;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool networkBufferPool;
    private BufferPool localBufferPool;
    private static final ExecutorService executor = Executors.newCachedThreadPool();
    @Rule
    public Timeout timeout = new Timeout(10L, TimeUnit.SECONDS);

    @Before
    public void setupLocalBufferPool() throws Exception {
        this.networkBufferPool = new NetworkBufferPool(1024, 128);
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @After
    public void destroyAndVerifyAllBuffersReturned() {
        if (!this.localBufferPool.isDestroyed()) {
            this.localBufferPool.lazyDestroy();
        }
        String msg = "Did not return all buffers to memory segment pool after test.";
        Assert.assertEquals((String)msg, (long)1024L, (long)this.networkBufferPool.getNumberOfAvailableMemorySegments());
        this.networkBufferPool.destroyAllBufferPools();
        this.networkBufferPool.destroy();
    }

    @AfterClass
    public static void shutdownExecutor() {
        executor.shutdownNow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReserveSegments() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2, 128, Duration.ofSeconds(2L));
        try {
            BufferPool bufferPool1 = networkBufferPool.createBufferPool(1, 2);
            Assert.assertThrows(IllegalArgumentException.class, () -> bufferPool1.reserveSegments(2));
            ArrayList<Buffer> buffers = new ArrayList<Buffer>(2);
            buffers.add(bufferPool1.requestBuffer());
            buffers.add(bufferPool1.requestBuffer());
            Assert.assertEquals((long)2L, (long)buffers.size());
            BufferPool bufferPool2 = networkBufferPool.createBufferPool(1, 10);
            Assert.assertThrows(IOException.class, () -> bufferPool2.reserveSegments(1));
            Assert.assertFalse((boolean)bufferPool2.isAvailable());
            buffers.forEach(Buffer::recycleBuffer);
            bufferPool1.lazyDestroy();
            bufferPool2.lazyDestroy();
            BufferPool bufferPool3 = networkBufferPool.createBufferPool(2, 10);
            Assert.assertEquals((long)1L, (long)bufferPool3.getNumberOfAvailableMemorySegments());
            bufferPool3.reserveSegments(2);
            Assert.assertEquals((long)2L, (long)bufferPool3.getNumberOfAvailableMemorySegments());
            bufferPool3.lazyDestroy();
            Assert.assertThrows(IllegalStateException.class, () -> bufferPool3.reserveSegments(1));
        }
        finally {
            networkBufferPool.destroy();
        }
    }

    @Test
    public void testRequestMoreThanAvailable() {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            Buffer buffer2 = this.localBufferPool.requestBuffer();
            Assert.assertEquals((long)Math.min(i + 1, 1024), (long)this.getNumRequestedFromMemorySegmentPool());
            Assert.assertNotNull((Object)buffer2);
            requests.add(buffer2);
        }
        Buffer buffer = this.localBufferPool.requestBuffer();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        Assert.assertNull((Object)buffer);
        for (Buffer buffer2 : requests) {
            buffer2.recycleBuffer();
        }
    }

    @Test
    public void testRequestAfterDestroy() {
        this.localBufferPool.lazyDestroy();
        try {
            this.localBufferPool.requestBuffer();
            Assert.fail((String)"Call should have failed with an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testSetNumAfterDestroyDoesNotProactivelyFetchSegments() {
        this.localBufferPool.setNumBuffers(2);
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumBuffers());
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.lazyDestroy();
        this.localBufferPool.setNumBuffers(3);
        Assert.assertEquals((long)3L, (long)this.localBufferPool.getNumBuffers());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    public void testRecycleAfterDestroy() {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 0; i < 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        this.localBufferPool.lazyDestroy();
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterRecycling() {
        int i;
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)1024L, (long)this.getNumRequestedFromMemorySegmentPool());
        for (i = 1; i < 512; ++i) {
            ((Buffer)requests.remove(0)).recycleBuffer();
            Assert.assertEquals((long)(1024 - i), (long)this.getNumRequestedFromMemorySegmentPool());
        }
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testRecycleExcessBuffersAfterChangingNumBuffers() {
        this.localBufferPool.setNumBuffers(1024);
        ArrayList<Buffer> requests = new ArrayList<Buffer>(1024);
        for (int i = 1; i <= 1024; ++i) {
            requests.add(this.localBufferPool.requestBuffer());
        }
        for (Buffer buffer : requests) {
            buffer.recycleBuffer();
        }
        Assert.assertEquals((long)1024L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(512);
        Assert.assertEquals((long)512L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testSetLessThanRequiredNumBuffers() {
        this.localBufferPool.setNumBuffers(1);
        this.localBufferPool.setNumBuffers(0);
    }

    @Test
    public void testPendingRequestWithListenersAfterRecycle() {
        CountBufferListener listener1 = new CountBufferListener();
        CountBufferListener listener2 = new CountBufferListener();
        Buffer available = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertTrue((boolean)this.localBufferPool.addBufferListener((BufferListener)listener1));
        Assert.assertTrue((boolean)this.localBufferPool.addBufferListener((BufferListener)listener2));
        ((Buffer)Preconditions.checkNotNull((Object)available)).recycleBuffer();
        Assert.assertEquals((long)1L, (long)listener1.getCount());
        Assert.assertEquals((long)1L, (long)listener1.getCount());
        Assert.assertFalse((boolean)this.localBufferPool.addBufferListener((BufferListener)listener1));
        Assert.assertFalse((boolean)this.localBufferPool.addBufferListener((BufferListener)listener2));
    }

    @Test
    public void testCancelPendingRequestsAfterDestroy() {
        BufferListener listener = (BufferListener)Mockito.mock(BufferListener.class);
        this.localBufferPool.setNumBuffers(1);
        Buffer available = this.localBufferPool.requestBuffer();
        Buffer unavailable = this.localBufferPool.requestBuffer();
        Assert.assertNull((Object)unavailable);
        this.localBufferPool.addBufferListener(listener);
        this.localBufferPool.lazyDestroy();
        available.recycleBuffer();
        ((BufferListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyBufferDestroyed();
    }

    @Test
    public void testConcurrentRequestRecycle() throws ExecutionException, InterruptedException {
        int i;
        int numConcurrentTasks = 128;
        int numBuffersToRequestPerTask = 1024;
        this.localBufferPool.setNumBuffers(numConcurrentTasks);
        Future[] taskResults = new Future[numConcurrentTasks];
        for (i = 0; i < numConcurrentTasks; ++i) {
            taskResults[i] = executor.submit(new BufferRequesterTask((BufferProvider)this.localBufferPool, numBuffersToRequestPerTask));
        }
        for (i = 0; i < numConcurrentTasks; ++i) {
            Assert.assertTrue((boolean)((Boolean)taskResults[i].get()));
        }
    }

    @Test
    public void testDestroyDuringBlockingRequest() throws Exception {
        boolean numberOfBuffers = true;
        this.localBufferPool.setNumBuffers(1);
        final CountDownLatch sync = new CountDownLatch(1);
        Callable<List<Buffer>> requester = new Callable<List<Buffer>>(){

            @Override
            public List<Buffer> call() throws Exception {
                ArrayList requested = Lists.newArrayList();
                for (int i = 0; i < 1; ++i) {
                    Buffer buffer = (Buffer)Preconditions.checkNotNull((Object)LocalBufferPoolTest.this.localBufferPool.requestBuffer());
                    requested.add(buffer);
                }
                sync.countDown();
                try {
                    LocalBufferPoolTest.this.localBufferPool.requestBufferBuilderBlocking();
                    Assert.fail((String)"Call should have failed with an IllegalStateException");
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                return requested;
            }
        };
        Future<List<Buffer>> f = executor.submit(requester);
        sync.await();
        this.localBufferPool.lazyDestroy();
        Thread.sleep(50L);
        List<Buffer> requestedBuffers = f.get(60L, TimeUnit.SECONDS);
        for (Buffer buffer : requestedBuffers) {
            buffer.recycleBuffer();
        }
    }

    @Test
    public void testBoundedBuffer() throws Exception {
        this.localBufferPool.lazyDestroy();
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1, 2);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getMaxNumberOfMemorySegments());
        this.localBufferPool.setNumBuffers(1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Buffer buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(2);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Buffer buffer2 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer2);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2.recycleBuffer();
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(3);
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer2);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer2.recycleBuffer();
        Assert.assertEquals((long)2L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        this.localBufferPool.setNumBuffers(1);
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        buffer1 = this.localBufferPool.requestBuffer();
        Assert.assertNotNull((Object)buffer1);
        Assert.assertEquals((long)0L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertNull((Object)this.localBufferPool.requestBuffer());
        buffer1.recycleBuffer();
        Assert.assertEquals((long)1L, (long)this.localBufferPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    public void testMaxBuffersPerChannelAndAvailability() throws Exception {
        this.localBufferPool.lazyDestroy();
        this.localBufferPool = new LocalBufferPool(this.networkBufferPool, 1, Integer.MAX_VALUE, 3, 2);
        this.localBufferPool.setNumBuffers(10);
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder01 = this.localBufferPool.requestBufferBuilderBlocking(0);
        BufferBuilder bufferBuilder11 = this.localBufferPool.requestBufferBuilderBlocking(1);
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        BufferBuilder bufferBuilder02 = this.localBufferPool.requestBufferBuilderBlocking(0);
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        Assert.assertNull((Object)this.localBufferPool.requestBufferBuilder(0));
        BufferBuilder bufferBuilder21 = this.localBufferPool.requestBufferBuilderBlocking(2);
        BufferBuilder bufferBuilder22 = this.localBufferPool.requestBufferBuilderBlocking(2);
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder11.close();
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder21.close();
        Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder02.close();
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder01.close();
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
        bufferBuilder22.close();
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
    }

    @Test
    public void testIsAvailableOrNot() throws InterruptedException {
        Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
        try (BufferBuilder bufferBuilder = (BufferBuilder)Preconditions.checkNotNull((Object)this.localBufferPool.requestBufferBuilderBlocking());){
            CompletableFuture availableFuture = this.localBufferPool.getAvailableFuture();
            Assert.assertFalse((boolean)availableFuture.isDone());
            int numLocalBuffers = 5;
            this.localBufferPool.setNumBuffers(5);
            Assert.assertTrue((boolean)availableFuture.isDone());
            Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
            ArrayDeque<Object> buffers = new ArrayDeque<Object>(1024);
            for (int i = 0; i < 4; ++i) {
                Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
                buffers.add(Preconditions.checkNotNull((Object)this.localBufferPool.requestBuffer()));
            }
            Assert.assertFalse((boolean)this.localBufferPool.isAvailable());
            ((Buffer)buffers.pop()).recycleBuffer();
            Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
            for (Buffer buffer : buffers) {
                buffer.recycleBuffer();
            }
            Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
            this.localBufferPool.setNumBuffers(2);
            Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
            Buffer buffer2 = (Buffer)Preconditions.checkNotNull((Object)this.localBufferPool.requestBuffer());
            Assert.assertFalse((boolean)this.localBufferPool.isAvailable());
            buffer2.recycleBuffer();
            Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
            this.localBufferPool.setNumBuffers(1);
            Assert.assertFalse((boolean)this.localBufferPool.getAvailableFuture().isDone());
        }
        Assert.assertTrue((boolean)this.localBufferPool.isAvailable());
        Assert.assertTrue((boolean)this.localBufferPool.getAvailableFuture().isDone());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsistentAvailability() throws Exception {
        TestNetworkBufferPool globalPool = new TestNetworkBufferPool(1024, 128);
        try {
            LocalBufferPool localPool = new LocalBufferPool((NetworkBufferPool)globalPool, 1);
            MemorySegment segment = localPool.requestMemorySegmentBlocking();
            localPool.setNumBuffers(2);
            localPool.recycle(segment);
            localPool.lazyDestroy();
        }
        finally {
            globalPool.destroy();
        }
    }

    private int getNumRequestedFromMemorySegmentPool() {
        return this.networkBufferPool.getTotalNumberOfMemorySegments() - this.networkBufferPool.getNumberOfAvailableMemorySegments();
    }

    private static class TestNetworkBufferPool
    extends NetworkBufferPool {
        private int requestCounter;

        public TestNetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
            super(numberOfSegmentsToAllocate, segmentSize);
        }

        @Nullable
        public MemorySegment requestMemorySegment() {
            if (this.requestCounter++ == 1) {
                return null;
            }
            return super.requestMemorySegment();
        }
    }

    private static class BufferRequesterTask
    implements Callable<Boolean> {
        private final BufferProvider bufferProvider;
        private final int numBuffersToRequest;

        private BufferRequesterTask(BufferProvider bufferProvider, int numBuffersToRequest) {
            this.bufferProvider = bufferProvider;
            this.numBuffersToRequest = numBuffersToRequest;
        }

        @Override
        public Boolean call() throws Exception {
            try {
                for (int i = 0; i < this.numBuffersToRequest; ++i) {
                    Buffer buffer = (Buffer)Preconditions.checkNotNull((Object)this.bufferProvider.requestBuffer());
                    buffer.recycleBuffer();
                }
            }
            catch (Throwable t) {
                return false;
            }
            return true;
        }
    }

    private static class CountBufferListener
    implements BufferListener {
        private final AtomicInteger times = new AtomicInteger(0);

        private CountBufferListener() {
        }

        public boolean notifyBufferAvailable(Buffer buffer) {
            this.times.incrementAndGet();
            buffer.recycleBuffer();
            return true;
        }

        public void notifyBufferDestroyed() {
        }

        int getCount() {
            return this.times.get();
        }
    }
}

