/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriterWithCallback;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IORequest;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AsynchronousFileIOChannelTest {
    private static final Logger LOG = LoggerFactory.getLogger(AsynchronousFileIOChannelTest.class);
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(Executors::newCachedThreadPool);

    AsynchronousFileIOChannelTest() {
    }

    @Test
    void testAllRequestsProcessedListenerNotification() throws Exception {
        int numberOfRuns = 10;
        int numberOfRequests = 100;
        Random random = new Random();
        RequestQueue requestQueue = new RequestQueue();
        NoOpCallback<Buffer> ioChannelCallback = new NoOpCallback<Buffer>();
        TestNotificationListener listener = new TestNotificationListener();
        try (IOManagerAsync ioManager = new IOManagerAsync();){
            for (int run = 0; run < 10; ++run) {
                TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(ioManager.createChannel(), (RequestQueue<WriteRequest>)requestQueue, ioChannelCallback, true);
                CountDownLatch sync = new CountDownLatch(3);
                Buffer buffer = BufferBuilderTestUtils.buildSomeBuffer();
                NoOpWriteRequest request = new NoOpWriteRequest();
                Callable<Void> addRequestsTask = () -> {
                    for (int i = 0; i < 10; ++i) {
                        LOG.debug("Starting run {}.", (Object)(i + 1));
                        for (int j = 0; j < 100; ++j) {
                            ioChannel.addRequest((IORequest)request);
                        }
                        LOG.debug("Added all ({}) requests of run {}.", (Object)100, (Object)(i + 1));
                        int sleep = random.nextInt(10);
                        LOG.debug("Sleeping for {} ms before next run.", (Object)sleep);
                        Thread.sleep(sleep);
                    }
                    LOG.debug("Done. Closing channel.");
                    ioChannel.close();
                    sync.countDown();
                    return null;
                };
                Callable<Void> processRequestsTask = () -> {
                    int total = 1000;
                    for (int i = 0; i < total; ++i) {
                        requestQueue.take();
                        ioChannel.handleProcessedBuffer(buffer, null);
                    }
                    LOG.debug("Processed all ({}) requests.", (Object)100);
                    sync.countDown();
                    return null;
                };
                Callable<Void> registerListenerTask = () -> {
                    while (true) {
                        int current = listener.getNumberOfNotifications();
                        if (ioChannel.registerAllRequestsProcessedListener(listener)) {
                            listener.waitForNotification(current);
                            continue;
                        }
                        if (ioChannel.isClosed()) break;
                    }
                    LOG.debug("Stopping listener. Channel closed.");
                    sync.countDown();
                    return null;
                };
                LinkedList<Callable<Void>> tasks = new LinkedList<Callable<Void>>();
                tasks.add(addRequestsTask);
                tasks.add(processRequestsTask);
                tasks.add(registerListenerTask);
                Collections.shuffle(tasks);
                for (Callable callable : tasks) {
                    EXECUTOR_EXTENSION.getExecutor().submit(callable);
                }
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)sync.await(2L, TimeUnit.MINUTES)).withFailMessage("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.", new Object[0])).isTrue();
                listener.reset();
            }
        }
    }

    @Test
    void testClosedButAddRequestAndRegisterListenerRace() throws Exception {
        int numberOfRuns = 1024;
        RequestQueue requestQueue = new RequestQueue();
        NoOpCallback<Buffer> ioChannelCallback = new NoOpCallback<Buffer>();
        TestNotificationListener listener = new TestNotificationListener();
        try (IOManagerAsync ioManager = new IOManagerAsync();){
            for (int i = 0; i < 1024; ++i) {
                TestAsyncFileIOChannel ioChannel = new TestAsyncFileIOChannel(ioManager.createChannel(), (RequestQueue<WriteRequest>)requestQueue, ioChannelCallback, true);
                CountDownLatch sync = new CountDownLatch(2);
                NoOpWriteRequest request = new NoOpWriteRequest();
                ioChannel.close();
                Callable<Void> addRequestTask = () -> {
                    try {
                        ioChannel.addRequest((IORequest)request);
                    }
                    catch (Throwable throwable) {
                    }
                    finally {
                        sync.countDown();
                    }
                    return null;
                };
                Callable<Void> registerListenerTask = () -> {
                    try {
                        while (true) {
                            int current = listener.getNumberOfNotifications();
                            if (ioChannel.registerAllRequestsProcessedListener(listener)) {
                                listener.waitForNotification(current);
                                continue;
                            }
                            if (ioChannel.isClosed()) break;
                        }
                    }
                    finally {
                        sync.countDown();
                    }
                    return null;
                };
                ExecutorService executor = EXECUTOR_EXTENSION.getExecutor();
                executor.submit(addRequestTask);
                executor.submit(registerListenerTask);
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)sync.await(2L, TimeUnit.MINUTES)).withFailMessage("Test failed due to a timeout. This indicates a deadlock due to the waythat listeners are registered/notified in the asynchronous file I/Ochannel.", new Object[0])).isTrue();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testClosingWaits() throws Exception {
        try (IOManagerAsync ioMan = new IOManagerAsync();){
            int NUM_BLOCKS = 100;
            final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment((int)32768);
            final AtomicInteger callbackCounter = new AtomicInteger();
            final AtomicBoolean exceptionOccurred = new AtomicBoolean();
            RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>(){

                public void requestSuccessful(MemorySegment buffer) {
                    callbackCounter.set(callbackCounter.get() + 1);
                    if (buffer != seg) {
                        exceptionOccurred.set(true);
                    }
                }

                public void requestFailed(MemorySegment buffer, IOException e) {
                    exceptionOccurred.set(true);
                }
            };
            BlockChannelWriterWithCallback writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), (RequestDoneCallback)callback);
            try {
                for (int i = 0; i < 100; ++i) {
                    writer.writeBlock((Object)seg);
                }
                writer.close();
                Assertions.assertThat((AtomicInteger)callbackCounter).hasValue(100);
                Assertions.assertThat((AtomicBoolean)exceptionOccurred).isFalse();
            }
            finally {
                writer.closeAndDelete();
            }
        }
    }

    @Test
    void testExceptionForwardsToClose() throws Exception {
        try (IOManagerAsync ioMan = new IOManagerAsync();){
            this.testExceptionForwardsToClose(ioMan, 100, 1);
            this.testExceptionForwardsToClose(ioMan, 100, 50);
            this.testExceptionForwardsToClose(ioMan, 100, 100);
        }
    }

    private void testExceptionForwardsToClose(IOManagerAsync ioMan, int numBlocks, final int failingBlock) throws IOException {
        MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment((int)32768);
        FileIOChannel.ID channelId = ioMan.createChannel();
        AsynchronousBlockWriterWithCallback writer = new AsynchronousBlockWriterWithCallback(channelId, ioMan.getWriteRequestQueue(channelId), new NoOpCallback()){
            private int numBlocks;

            public void writeBlock(MemorySegment segment) throws IOException {
                ++this.numBlocks;
                if (this.numBlocks == failingBlock) {
                    this.requestsNotReturned.incrementAndGet();
                    this.requestQueue.add((Object)new FailingWriteRequest((AsynchronousFileIOChannel<MemorySegment, WriteRequest>)this, segment));
                } else {
                    super.writeBlock(segment);
                }
            }
        };
        Assertions.assertThatThrownBy(() -> AsynchronousFileIOChannelTest.lambda$testExceptionForwardsToClose$5(numBlocks, (BlockChannelWriterWithCallback)writer, seg)).isInstanceOf(IOException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static /* synthetic */ void lambda$testExceptionForwardsToClose$5(int numBlocks, BlockChannelWriterWithCallback writer, MemorySegment seg) throws Throwable {
        try {
            for (int i = 0; i < numBlocks; ++i) {
                writer.writeBlock((Object)seg);
            }
            writer.close();
        }
        finally {
            writer.closeAndDelete();
        }
    }

    private static class TestAsyncFileIOChannel
    extends AsynchronousFileIOChannel<Buffer, WriteRequest> {
        protected TestAsyncFileIOChannel(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue, RequestDoneCallback<Buffer> callback, boolean writeEnabled) throws IOException {
            super(channelID, requestQueue, callback, writeEnabled);
        }

        int getNumberOfOutstandingRequests() {
            return this.requestsNotReturned.get();
        }
    }

    private static class FailingWriteRequest
    implements WriteRequest {
        private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
        private final MemorySegment segment;

        protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
            this.channel = targetChannel;
            this.segment = segment;
        }

        public void write() throws IOException {
            throw new IOException();
        }

        public void requestDone(IOException ioex) {
            this.channel.handleProcessedBuffer((Object)this.segment, ioex);
        }
    }

    private static class NoOpWriteRequest
    implements WriteRequest {
        private NoOpWriteRequest() {
        }

        public void requestDone(IOException ioex) {
        }

        public void write() {
        }
    }

    private static class NoOpCallback<T>
    implements RequestDoneCallback<T> {
        private NoOpCallback() {
        }

        public void requestSuccessful(T buffer) {
        }

        public void requestFailed(T buffer, IOException e) {
        }
    }
}

