/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingResultPartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
class BoundedBlockingSubpartitionWriteReadTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    @TempDir
    private File tmpFolder;
    private static final int BUFFER_SIZE = 0x100000;
    private static final String COMPRESSION_CODEC = "LZ4";
    private static final BufferDecompressor decompressor;
    private final BoundedBlockingSubpartitionType type;
    private final boolean compressionEnabled;
    private final boolean sslEnabled;

    @Parameters(name="type = {0}, compressionEnabled = {1}")
    private static Collection<Object[]> parameters() {
        return Arrays.stream(BoundedBlockingSubpartitionType.values()).map(type -> new Object[][]{{type, true}, {type, false}}).flatMap(Arrays::stream).collect(Collectors.toList());
    }

    BoundedBlockingSubpartitionWriteReadTest(BoundedBlockingSubpartitionType type, boolean compressionEnabled) {
        this.type = type;
        this.compressionEnabled = compressionEnabled;
        this.sslEnabled = compressionEnabled;
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @TestTemplate
    void testWriteAndReadData() throws Exception {
        int numLongs = 15000000;
        BoundedBlockingSubpartition subpartition = this.createAndFillPartition(15000000L);
        ResultSubpartitionView reader = subpartition.createReadView(view -> {});
        BoundedBlockingSubpartitionWriteReadTest.readLongs(reader, 15000000L, subpartition.getBuffersInBacklogUnsafe(), this.compressionEnabled, decompressor);
        reader.releaseAllResources();
        subpartition.release();
    }

    @TestTemplate
    void testRead10ConsumersSequential() throws Exception {
        int numLongs = 10000000;
        BoundedBlockingSubpartition subpartition = this.createAndFillPartition(10000000L);
        for (int i = 0; i < 10; ++i) {
            ResultSubpartitionView reader = subpartition.createReadView(view -> {});
            BoundedBlockingSubpartitionWriteReadTest.readLongs(reader, 10000000L, subpartition.getBuffersInBacklogUnsafe(), this.compressionEnabled, decompressor);
            reader.releaseAllResources();
        }
        subpartition.release();
    }

    @TestTemplate
    void testRead10ConsumersConcurrent() throws Exception {
        LongReader[] readerThreads;
        int numLongs = 15000000;
        BoundedBlockingSubpartition subpartition = this.createAndFillPartition(15000000L);
        for (LongReader t : readerThreads = BoundedBlockingSubpartitionWriteReadTest.createSubpartitionLongReaders(subpartition, 10, 15000000, subpartition.getBuffersInBacklogUnsafe(), this.compressionEnabled)) {
            t.start();
        }
        for (LongReader t : readerThreads) {
            t.sync();
        }
        subpartition.release();
    }

    private static void readLongs(ResultSubpartitionView reader, long numLongs, int numBuffers, boolean compressionEnabled, BufferDecompressor decompressor) throws Exception {
        ResultSubpartition.BufferAndBacklog next;
        long expectedNextLong = 0L;
        int nextExpectedBacklog = numBuffers - 1;
        while ((next = reader.getNextBuffer()) != null && next.buffer().isBuffer()) {
            Assertions.assertThat((boolean)next.isDataAvailable()).isTrue();
            Assertions.assertThat((int)next.buffersInBacklog()).isEqualTo(nextExpectedBacklog);
            ByteBuffer buffer = next.buffer().getNioBufferReadable();
            if (compressionEnabled && next.buffer().isCompressed()) {
                Buffer uncompressedBuffer = decompressor.decompressToIntermediateBuffer(next.buffer());
                buffer = uncompressedBuffer.getNioBufferReadable();
                uncompressedBuffer.recycleBuffer();
            }
            while (buffer.hasRemaining()) {
                Assertions.assertThat((long)buffer.getLong()).isEqualTo(expectedNextLong++);
            }
            next.buffer().recycleBuffer();
            --nextExpectedBacklog;
        }
        Assertions.assertThat((long)expectedNextLong).isEqualTo(numLongs);
        Assertions.assertThat((int)nextExpectedBacklog).isEqualTo(-1);
    }

    private static void writeLongs(BoundedBlockingSubpartition partition, long nums) throws IOException {
        MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment((int)0x100000);
        long l = 0L;
        while (nums > 0L) {
            int pos;
            for (pos = 0; nums > 0L && pos <= memory.size() - 8; --nums, pos += 8) {
                memory.putLongBigEndian(pos, l++);
            }
            partition.add(new BufferConsumer((Buffer)new NetworkBuffer(memory, ignored -> {}, Buffer.DataType.DATA_BUFFER), pos));
            partition.flush();
        }
    }

    private BoundedBlockingSubpartition createAndFillPartition(long numLongs) throws IOException {
        BoundedBlockingSubpartition subpartition = this.createSubpartition();
        BoundedBlockingSubpartitionWriteReadTest.writeLongs(subpartition, numLongs);
        this.writeEndOfData(subpartition);
        subpartition.finish();
        return subpartition;
    }

    private void writeEndOfData(BoundedBlockingSubpartition subpartition) throws IOException {
        try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer((AbstractEvent)new EndOfData(StopMode.DRAIN), (boolean)false);){
            subpartition.add(eventBufferConsumer.copy(), 0);
        }
    }

    private BoundedBlockingSubpartition createSubpartition() throws IOException {
        return this.type.create(0, (ResultPartition)((BoundedBlockingResultPartition)PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING, fileChannelManager, this.compressionEnabled, 0x100000)), new File(this.tmpFolder, "partitiondata"), 0x100000, this.sslEnabled);
    }

    private static LongReader[] createSubpartitionLongReaders(BoundedBlockingSubpartition subpartition, int numReaders, int numLongs, int numBuffers, boolean compressionEnabled) throws IOException {
        LongReader[] readerThreads = new LongReader[numReaders];
        for (int i = 0; i < numReaders; ++i) {
            ResultSubpartitionView reader = subpartition.createReadView(view -> {});
            readerThreads[i] = new LongReader(reader, numLongs, numBuffers, compressionEnabled);
        }
        return readerThreads;
    }

    static {
        decompressor = new BufferDecompressor(0x100000, COMPRESSION_CODEC);
    }

    private static final class LongReader
    extends CheckedThread {
        private final ResultSubpartitionView reader;
        private final long numLongs;
        private final int numBuffers;
        private final boolean compressionEnabled;
        private final BufferDecompressor decompressor;

        LongReader(ResultSubpartitionView reader, long numLongs, int numBuffers, boolean compressionEnabled) {
            this.reader = reader;
            this.numLongs = numLongs;
            this.numBuffers = numBuffers;
            this.compressionEnabled = compressionEnabled;
            this.decompressor = new BufferDecompressor(0x100000, BoundedBlockingSubpartitionWriteReadTest.COMPRESSION_CODEC);
        }

        public void go() throws Exception {
            BoundedBlockingSubpartitionWriteReadTest.readLongs(this.reader, this.numLongs, this.numBuffers, this.compressionEnabled, this.decompressor);
        }
    }
}

