/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.partition.BoundedData;
import org.apache.flink.runtime.io.network.partition.PageSizeUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
abstract class BoundedDataTestBase {
    private Path subpartitionDataPath;
    protected static final int BUFFER_SIZE = 0x100000;
    private static final NettyShuffleEnvironmentOptions.CompressionCodec COMPRESSION_CODEC = NettyShuffleEnvironmentOptions.CompressionCodec.LZ4;
    private static final BufferCompressor COMPRESSOR = new BufferCompressor(0x100000, COMPRESSION_CODEC);
    private static final BufferDecompressor DECOMPRESSOR = new BufferDecompressor(0x100000, COMPRESSION_CODEC);
    @Parameter
    private static boolean compressionEnabled;

    BoundedDataTestBase() {
    }

    @Parameters(name="compressionEnabled = {0}")
    private static List<Boolean> compressionEnabled() {
        return Arrays.asList(false, true);
    }

    @BeforeEach
    void before(@TempDir Path tempDir) {
        this.subpartitionDataPath = tempDir.resolve("subpartitiondata");
    }

    protected abstract boolean isRegionBased();

    protected abstract BoundedData createBoundedData(Path var1) throws IOException;

    protected abstract BoundedData createBoundedDataWithRegion(Path var1, int var2) throws IOException;

    protected BoundedData createBoundedData() throws IOException {
        return this.createBoundedData(this.subpartitionDataPath);
    }

    private BoundedData createBoundedDataWithRegion(int regionSize) throws IOException {
        return this.createBoundedDataWithRegion(this.subpartitionDataPath, regionSize);
    }

    @TestTemplate
    void testWriteAndReadData() throws Exception {
        try (BoundedData bd = this.createBoundedData();){
            this.testWriteAndReadData(bd);
        }
    }

    @TestTemplate
    void testWriteAndReadDataAcrossRegions() throws Exception {
        if (!this.isRegionBased()) {
            return;
        }
        try (BoundedData bd = this.createBoundedDataWithRegion(1276347);){
            this.testWriteAndReadData(bd);
        }
    }

    private void testWriteAndReadData(BoundedData bd) throws Exception {
        int numLongs = 10000000;
        int numBuffers = BoundedDataTestBase.writeLongs(bd, 10000000);
        bd.finishWrite();
        BoundedDataTestBase.readLongs(bd.createReader(), numBuffers, 10000000);
    }

    @TestTemplate
    void returnNullAfterEmpty() throws Exception {
        try (BoundedData bd = this.createBoundedData();){
            bd.finishWrite();
            BoundedData.Reader reader = bd.createReader();
            Assertions.assertThat((Object)reader.nextBuffer()).isNull();
            Assertions.assertThat((Object)reader.nextBuffer()).isNull();
            Assertions.assertThat((Object)reader.nextBuffer()).isNull();
        }
    }

    @TestTemplate
    void testDeleteFileOnClose() throws Exception {
        BoundedData bd = this.createBoundedData(this.subpartitionDataPath);
        Assertions.assertThat((Path)this.subpartitionDataPath).exists();
        bd.close();
        Assertions.assertThat((Path)this.subpartitionDataPath).doesNotExist();
    }

    @TestTemplate
    void testGetSizeSingleRegion() throws Exception {
        try (BoundedData bd = this.createBoundedData();){
            BoundedDataTestBase.testGetSize(bd, 60787, 76687);
        }
    }

    @TestTemplate
    void testGetSizeMultipleRegions() throws Exception {
        if (!this.isRegionBased()) {
            return;
        }
        int pageSize = PageSizeUtil.getSystemPageSizeOrConservativeMultiple();
        try (BoundedData bd = this.createBoundedDataWithRegion(pageSize);){
            BoundedDataTestBase.testGetSize(bd, pageSize / 3, pageSize - 8);
        }
    }

    private static void testGetSize(BoundedData bd, int bufferSize1, int bufferSize2) throws Exception {
        int expectedSize1 = bufferSize1 + 8;
        int expectedSizeFinal = bufferSize1 + bufferSize2 + 16;
        bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1));
        Assertions.assertThat((long)bd.getSize()).isEqualTo((long)expectedSize1);
        bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2));
        Assertions.assertThat((long)bd.getSize()).isEqualTo((long)expectedSizeFinal);
        bd.finishWrite();
        Assertions.assertThat((long)bd.getSize()).isEqualTo((long)expectedSizeFinal);
    }

    private static int writeLongs(BoundedData bd, int numLongs) throws IOException {
        int numLongsInBuffer = 131072;
        int numBuffers = 0;
        for (long nextValue = 0L; nextValue < (long)numLongs; nextValue += 131072L) {
            Buffer buffer = BufferBuilderTestUtils.buildBufferWithAscendingLongs(0x100000, 131072, nextValue);
            if (compressionEnabled) {
                Buffer compressedBuffer = COMPRESSOR.compressToIntermediateBuffer(buffer);
                bd.writeBuffer(compressedBuffer);
                if (compressedBuffer != buffer) {
                    compressedBuffer.recycleBuffer();
                }
            } else {
                bd.writeBuffer(buffer);
            }
            ++numBuffers;
            buffer.recycleBuffer();
        }
        return numBuffers;
    }

    private static void readLongs(BoundedData.Reader reader, int numBuffersExpected, int numLongs) throws IOException {
        Buffer b;
        long nextValue = 0L;
        int numBuffers = 0;
        while ((b = reader.nextBuffer()) != null) {
            int numLongsInBuffer;
            if (compressionEnabled && b.isCompressed()) {
                Buffer decompressedBuffer = DECOMPRESSOR.decompressToIntermediateBuffer(b);
                numLongsInBuffer = decompressedBuffer.getSize() / 8;
                BufferBuilderTestUtils.validateBufferWithAscendingLongs(decompressedBuffer, numLongsInBuffer, nextValue);
                decompressedBuffer.recycleBuffer();
            } else {
                numLongsInBuffer = b.getSize() / 8;
                BufferBuilderTestUtils.validateBufferWithAscendingLongs(b, numLongsInBuffer, nextValue);
            }
            nextValue += (long)numLongsInBuffer;
            ++numBuffers;
            b.recycleBuffer();
        }
        Assertions.assertThat((int)numBuffers).isEqualTo(numBuffersExpected);
        Assertions.assertThat((long)nextValue).isGreaterThanOrEqualTo((long)numLongs);
    }
}

