/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int totalBytes = 1024;
    private static final int numThreads = 4;
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private PartitionedFileReader fileReader;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;
    private BatchShuffleReadBufferPool bufferPool;
    private ExecutorService executor;
    private SortMergeResultPartitionReadScheduler readScheduler;
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public Timeout timeout = new Timeout(60L, TimeUnit.SECONDS);

    @Before
    public void before() throws Exception {
        Random random = new Random();
        random.nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(this.temporaryFolder.newFile().getAbsolutePath(), 10, 10, 1024, dataBytes);
        this.dataFileChannel = SortMergeResultPartitionReadSchedulerTest.openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = SortMergeResultPartitionReadSchedulerTest.openFileChannel(this.partitionedFile.getIndexFilePath());
        this.fileReader = new PartitionedFileReader(this.partitionedFile, 0, this.dataFileChannel, this.indexFileChannel);
        this.bufferPool = new BatchShuffleReadBufferPool(1024L, 1024);
        this.executor = Executors.newFixedThreadPool(4);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(10, this.bufferPool, (Executor)this.executor, (Object)this);
    }

    @After
    public void after() throws Exception {
        this.dataFileChannel.close();
        this.indexFileChannel.close();
        this.partitionedFile.deleteQuietly();
        this.bufferPool.destroy();
        this.executor.shutdown();
    }

    @Test
    public void testCreateSubpartitionReader() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Assert.assertTrue((boolean)this.readScheduler.isRunning());
        Assert.assertTrue((boolean)this.readScheduler.getDataFileChannel().isOpen());
        Assert.assertTrue((boolean)this.readScheduler.getIndexFileChannel().isOpen());
        int numBuffersRead = 0;
        while (numBuffersRead < 10) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            Buffer buffer = bufferAndBacklog.buffer();
            Assert.assertEquals((Object)ByteBuffer.wrap(dataBytes), (Object)buffer.getNioBufferReadable());
            buffer.recycleBuffer();
            ++numBuffersRead;
        }
    }

    @Test
    public void testOnSubpartitionReaderError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        subpartitionReader.releaseAllResources();
        this.waitUntilReadFinish();
        this.assertAllResourcesReleased();
    }

    @Test
    public void testReleaseWhileReading() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        Thread.sleep(1000L);
        this.readScheduler.release();
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.isReleased());
        Assert.assertEquals((long)0L, (long)subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.readScheduler.getReleaseFuture().get();
        this.assertAllResourcesReleased();
    }

    @Test(expected=IllegalStateException.class)
    public void testCreateSubpartitionReaderAfterReleased() throws Exception {
        this.bufferPool.initialize();
        this.readScheduler.release();
        try {
            this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        }
        finally {
            this.assertAllResourcesReleased();
        }
    }

    @Test
    public void testOnDataReadError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.readScheduler.getDataFileChannel().close();
        while (!subpartitionReader.isReleased()) {
            ResultSubpartition.BufferAndBacklog bufferAndBacklog = subpartitionReader.getNextBuffer();
            if (bufferAndBacklog == null) continue;
            bufferAndBacklog.buffer().recycleBuffer();
        }
        this.waitUntilReadFinish();
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.assertAllResourcesReleased();
    }

    @Test
    public void testOnReadBufferRequestError() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = this.readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        this.bufferPool.destroy();
        this.waitUntilReadFinish();
        Assert.assertTrue((boolean)subpartitionReader.isReleased());
        Assert.assertNotNull((Object)subpartitionReader.getFailureCause());
        Assert.assertTrue((boolean)subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
        this.assertAllResourcesReleased();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
        SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), this.fileReader);
        Thread readAndReleaseThread = new Thread(() -> {
            ArrayDeque<MemorySegment> segments = new ArrayDeque<MemorySegment>();
            segments.add(MemorySegmentFactory.allocateUnpooledSegment((int)1024));
            try {
                Assert.assertTrue((boolean)this.fileReader.hasRemaining());
                subpartitionReader.readBuffers(segments, (BufferRecycler)this.readScheduler);
                subpartitionReader.releaseAllResources();
                subpartitionReader.readBuffers(segments, (BufferRecycler)this.readScheduler);
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        SortMergeResultPartitionReadSchedulerTest sortMergeResultPartitionReadSchedulerTest = this;
        synchronized (sortMergeResultPartitionReadSchedulerTest) {
            readAndReleaseThread.start();
            do {
                Thread.sleep(100L);
            } while (!subpartitionReader.isReleased());
        }
        readAndReleaseThread.join();
    }

    @Test
    public void testRequestBufferTimeoutAndFailed() throws Exception {
        Duration bufferRequestTimeout = Duration.ofSeconds(3L);
        List buffers = this.bufferPool.requestBuffers();
        SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler(10, this.bufferPool, (Executor)this.executor, (Object)this, bufferRequestTimeout);
        SortMergeSubpartitionReader subpartitionReader = readScheduler.createSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), 0, this.partitionedFile);
        PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<SortMergeSubpartitionReader>();
        allReaders.add(subpartitionReader);
        long startTimestamp = System.nanoTime();
        Queue allocatedBuffers = readScheduler.allocateBuffers(allReaders);
        long requestDuration = System.nanoTime() - startTimestamp;
        Assert.assertEquals((long)0L, (long)allocatedBuffers.size());
        Assert.assertTrue((requestDuration > bufferRequestTimeout.toNanos() ? 1 : 0) != 0);
        SortMergeResultPartitionReadSchedulerTest.assertExpectedTimeoutException(subpartitionReader.getFailureCause());
        this.bufferPool.recycle((Collection)buffers);
        readScheduler.release();
    }

    @Test
    public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
        Duration bufferRequestTimeout = Duration.ofSeconds(3L);
        FakeBatchShuffleReadBufferPool bufferPool = new FakeBatchShuffleReadBufferPool(3072L, 1024);
        SortMergeResultPartitionReadScheduler readScheduler = new SortMergeResultPartitionReadScheduler(10, (BatchShuffleReadBufferPool)bufferPool, (Executor)this.executor, (Object)this, bufferRequestTimeout);
        SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader((BufferAvailabilityListener)new NoOpBufferAvailablityListener(), this.fileReader);
        PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<SortMergeSubpartitionReader>();
        allReaders.add(subpartitionReader);
        long startTimestamp = System.nanoTime();
        Queue allocatedBuffers = readScheduler.allocateBuffers(allReaders);
        long requestDuration = System.nanoTime() - startTimestamp;
        Assert.assertEquals((long)3L, (long)allocatedBuffers.size());
        Assert.assertTrue((requestDuration > bufferRequestTimeout.toNanos() * 2L ? 1 : 0) != 0);
        Assert.assertNull((Object)subpartitionReader.getFailureCause());
        bufferPool.recycle(allocatedBuffers);
        bufferPool.destroy();
        readScheduler.release();
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static void assertExpectedTimeoutException(Throwable throwable) {
        Assert.assertNotNull((Object)throwable);
        Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)throwable, (String)"Buffer request timeout").isPresent());
    }

    private void assertAllResourcesReleased() {
        Assert.assertNull((Object)this.readScheduler.getDataFileChannel());
        Assert.assertNull((Object)this.readScheduler.getIndexFileChannel());
        Assert.assertFalse((boolean)this.readScheduler.isRunning());
        Assert.assertEquals((long)0L, (long)this.readScheduler.getNumPendingReaders());
        if (!this.bufferPool.isDestroyed()) {
            Assert.assertEquals((long)this.bufferPool.getNumTotalBuffers(), (long)this.bufferPool.getAvailableBuffers());
        }
    }

    private void waitUntilReadFinish() throws Exception {
        while (this.readScheduler.isRunning()) {
            Thread.sleep(100L);
        }
    }

    private static class FakeBatchShuffleReadBufferPool
    extends BatchShuffleReadBufferPool {
        private final Queue<MemorySegment> requestedBuffers = new LinkedList<MemorySegment>(this.requestBuffers());

        FakeBatchShuffleReadBufferPool(long totalBytes, int bufferSize) throws Exception {
            super(totalBytes, bufferSize);
        }

        public long getLastBufferOperationTimestamp() {
            this.recycle(this.requestedBuffers.poll());
            return super.getLastBufferOperationTimestamp();
        }

        public void destroy() {
            this.recycle(this.requestedBuffers);
            this.requestedBuffers.clear();
            super.destroy();
        }
    }
}

