/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.DataBuffer;
import org.apache.flink.runtime.io.network.partition.HashBasedDataBuffer;
import org.apache.flink.runtime.io.network.partition.SortBasedDataBuffer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class DataBufferTest {
    private final boolean useHashBuffer;

    @Parameterized.Parameters(name="UseHashBuffer = {0}")
    public static Object[] parameters() {
        return new Object[]{true, false};
    }

    public DataBufferTest(boolean useHashBuffer) {
        this.useHashBuffer = useHashBuffer;
    }

    @Test
    public void testWriteAndReadDataBuffer() throws Exception {
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int bufferPoolSize = 512;
        Random random = new Random(1111L);
        Queue[] dataWritten = new Queue[numSubpartitions];
        Queue[] buffersRead = new Queue[numSubpartitions];
        for (int i = 0; i < numSubpartitions; ++i) {
            dataWritten[i] = new ArrayDeque();
            buffersRead[i] = new ArrayDeque();
        }
        int[] numBytesWritten = new int[numSubpartitions];
        int[] numBytesRead = new int[numSubpartitions];
        Arrays.fill(numBytesWritten, 0);
        Arrays.fill(numBytesRead, 0);
        int totalBytesWritten = 0;
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, DataBufferTest.getRandomSubpartitionOrder(numSubpartitions));
        int numDataBuffers = 5;
        while (numDataBuffers > 0) {
            BufferWithChannel buffer;
            int recordSize = random.nextInt(bufferSize * 4 - 1) + 1;
            byte[] bytes = new byte[recordSize];
            random.nextBytes(bytes);
            ByteBuffer record = ByteBuffer.wrap(bytes);
            int subpartition = random.nextInt(numSubpartitions);
            boolean isBuffer = random.nextBoolean();
            Buffer.DataType dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
            boolean isFull = dataBuffer.append(record, subpartition, dataType);
            record.flip();
            if (record.hasRemaining()) {
                dataWritten[subpartition].add(new DataAndType(record, dataType));
                int n = subpartition;
                numBytesWritten[n] = numBytesWritten[n] + record.remaining();
                totalBytesWritten += record.remaining();
            }
            while (isFull && dataBuffer.hasRemaining() && (buffer = this.copyIntoSegment(bufferSize, dataBuffer)) != null) {
                this.addBufferRead(buffer, buffersRead, numBytesRead);
            }
            if (!isFull) continue;
            --numDataBuffers;
            dataBuffer.reset();
        }
        if (dataBuffer.hasRemaining()) {
            Assert.assertTrue((boolean)(dataBuffer instanceof HashBasedDataBuffer));
            dataBuffer.reset();
            dataBuffer.finish();
            while (dataBuffer.hasRemaining()) {
                this.addBufferRead(this.copyIntoSegment(bufferSize, dataBuffer), buffersRead, numBytesRead);
            }
        }
        Assert.assertEquals((long)totalBytesWritten, (long)dataBuffer.numTotalBytes());
        DataBufferTest.checkWriteReadResult(numSubpartitions, numBytesWritten, numBytesRead, dataWritten, buffersRead);
    }

    private BufferWithChannel copyIntoSegment(int bufferSize, DataBuffer dataBuffer) {
        if (this.useHashBuffer) {
            BufferWithChannel buffer = dataBuffer.getNextBuffer(null);
            if (buffer == null || !buffer.getBuffer().isBuffer()) {
                return buffer;
            }
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
            int numBytes = buffer.getBuffer().readableBytes();
            segment.put(0, buffer.getBuffer().getNioBufferReadable(), numBytes);
            buffer.getBuffer().recycleBuffer();
            return new BufferWithChannel((Buffer)new NetworkBuffer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER, numBytes), buffer.getChannelIndex());
        }
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
        return dataBuffer.getNextBuffer(segment);
    }

    private void addBufferRead(BufferWithChannel buffer, Queue<Buffer>[] buffersRead, int[] numBytesRead) {
        int channel = buffer.getChannelIndex();
        buffersRead[channel].add(buffer.getBuffer());
        int n = channel;
        numBytesRead[n] = numBytesRead[n] + buffer.getBuffer().readableBytes();
    }

    /*
     * WARNING - void declaration
     */
    public static void checkWriteReadResult(int numSubpartitions, int[] numBytesWritten, int[] numBytesRead, Queue<DataAndType>[] dataWritten, Queue<Buffer>[] buffersRead) {
        for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions; ++subpartitionIndex) {
            void var10_10;
            Assert.assertEquals((long)numBytesWritten[subpartitionIndex], (long)numBytesRead[subpartitionIndex]);
            ArrayList<DataAndType> eventsWritten = new ArrayList<DataAndType>();
            ArrayList<Buffer> eventsRead = new ArrayList<Buffer>();
            ByteBuffer subpartitionDataWritten = ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
            for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
                subpartitionDataWritten.put(dataAndType.data);
                dataAndType.data.rewind();
                if (!dataAndType.dataType.isEvent()) continue;
                eventsWritten.add(dataAndType);
            }
            ByteBuffer subpartitionDataRead = ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
            for (Buffer buffer : buffersRead[subpartitionIndex]) {
                subpartitionDataRead.put(buffer.getNioBufferReadable());
                if (buffer.isBuffer()) continue;
                eventsRead.add(buffer);
            }
            subpartitionDataWritten.flip();
            subpartitionDataRead.flip();
            Assert.assertEquals((Object)subpartitionDataWritten, (Object)subpartitionDataRead);
            Assert.assertEquals((long)eventsWritten.size(), (long)eventsRead.size());
            boolean bl = false;
            while (var10_10 < eventsWritten.size()) {
                Assert.assertEquals((Object)((DataAndType)eventsWritten.get((int)var10_10)).dataType, (Object)((Buffer)eventsRead.get((int)var10_10)).getDataType());
                Assert.assertEquals((Object)((DataAndType)eventsWritten.get((int)var10_10)).data, (Object)((Buffer)eventsRead.get((int)var10_10)).getNioBufferReadable());
                ++var10_10;
            }
        }
    }

    @Test
    public void testWriteReadWithEmptyChannel() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int numSubpartitions = 5;
        ByteBuffer[] subpartitionRecords = new ByteBuffer[]{ByteBuffer.allocate(128), null, ByteBuffer.allocate(1536), null, ByteBuffer.allocate(1024)};
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions);
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            ByteBuffer record = subpartitionRecords[subpartition];
            if (record == null) continue;
            dataBuffer.append(record, subpartition, Buffer.DataType.DATA_BUFFER);
            record.rewind();
        }
        dataBuffer.finish();
        this.checkReadResult(dataBuffer, subpartitionRecords[0], 0, bufferSize);
        ByteBuffer expected1 = subpartitionRecords[2].duplicate();
        expected1.limit(bufferSize);
        this.checkReadResult(dataBuffer, expected1.slice(), 2, bufferSize);
        ByteBuffer expected2 = subpartitionRecords[2].duplicate();
        expected2.position(bufferSize);
        this.checkReadResult(dataBuffer, expected2.slice(), 2, bufferSize);
        this.checkReadResult(dataBuffer, subpartitionRecords[4], 4, bufferSize);
    }

    private void checkReadResult(DataBuffer dataBuffer, ByteBuffer expectedBuffer, int expectedChannel, int bufferSize) {
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
        BufferWithChannel bufferWithChannel = dataBuffer.getNextBuffer(segment);
        Assert.assertEquals((long)expectedChannel, (long)bufferWithChannel.getChannelIndex());
        Assert.assertEquals((Object)expectedBuffer, (Object)bufferWithChannel.getBuffer().getNioBufferReadable());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testWriteEmptyData() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        ByteBuffer record = ByteBuffer.allocate(1);
        record.position(1);
        dataBuffer.append(record, 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected=IllegalStateException.class)
    public void testWriteFinishedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.finish();
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test(expected=IllegalStateException.class)
    public void testWriteReleasedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.release();
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
    }

    @Test
    public void testWriteMoreDataThanCapacity() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, 1);
        for (int i = 1; i < bufferPoolSize; ++i) {
            this.appendAndCheckResult(dataBuffer, bufferSize, false, bufferSize * i, i, true);
        }
        int numRecords = bufferPoolSize - 1;
        long numBytes = this.useHashBuffer ? (long)(bufferSize * bufferPoolSize) : (long)(bufferSize * numRecords);
        this.appendAndCheckResult(dataBuffer, bufferSize + 1, true, numBytes, numRecords, true);
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(bufferPoolSize, bufferSize, 1);
        long numBytes = this.useHashBuffer ? (long)(bufferPoolSize * bufferSize) : 0L;
        this.appendAndCheckResult(dataBuffer, bufferPoolSize * bufferSize + 1, true, numBytes, 0L, this.useHashBuffer);
    }

    private void appendAndCheckResult(DataBuffer dataBuffer, int recordSize, boolean isFull, long numBytes, long numRecords, boolean hasRemaining) throws IOException {
        ByteBuffer largeRecord = ByteBuffer.allocate(recordSize);
        Assert.assertEquals((Object)isFull, (Object)dataBuffer.append(largeRecord, 0, Buffer.DataType.DATA_BUFFER));
        Assert.assertEquals((long)numBytes, (long)dataBuffer.numTotalBytes());
        Assert.assertEquals((long)numRecords, (long)dataBuffer.numTotalRecords());
        Assert.assertEquals((Object)hasRemaining, (Object)dataBuffer.hasRemaining());
    }

    @Test(expected=IllegalStateException.class)
    public void testReadUnfinishedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertTrue((boolean)dataBuffer.hasRemaining());
        dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
    }

    @Test(expected=IllegalStateException.class)
    public void testReadReleasedDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
        dataBuffer.finish();
        Assert.assertTrue((boolean)dataBuffer.hasRemaining());
        dataBuffer.release();
        Assert.assertTrue((boolean)dataBuffer.hasRemaining());
        dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize));
    }

    @Test
    public void testReadEmptyDataBuffer() throws Exception {
        int bufferSize = 1024;
        DataBuffer dataBuffer = this.createDataBuffer(1, bufferSize, 1);
        dataBuffer.finish();
        Assert.assertFalse((boolean)dataBuffer.hasRemaining());
        Assert.assertNull((Object)dataBuffer.getNextBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize)));
    }

    @Test
    public void testReleaseDataBuffer() throws Exception {
        int bufferPoolSize = 10;
        int bufferSize = 1024;
        int recordSize = (bufferPoolSize - 1) * bufferSize;
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        SortBasedDataBuffer dataBuffer = new SortBasedDataBuffer(bufferPool, 1, bufferSize, bufferPoolSize, null);
        dataBuffer.append(ByteBuffer.allocate(recordSize), 0, Buffer.DataType.DATA_BUFFER);
        Assert.assertEquals((long)bufferPoolSize, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        Assert.assertTrue((boolean)dataBuffer.hasRemaining());
        Assert.assertEquals((long)1L, (long)dataBuffer.numTotalRecords());
        Assert.assertEquals((long)recordSize, (long)dataBuffer.numTotalBytes());
        dataBuffer.release();
        Assert.assertEquals((long)0L, (long)bufferPool.bestEffortGetNumOfUsedBuffers());
        Assert.assertTrue((boolean)dataBuffer.hasRemaining());
        Assert.assertEquals((long)1L, (long)dataBuffer.numTotalRecords());
        Assert.assertEquals((long)recordSize, (long)dataBuffer.numTotalBytes());
    }

    private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions) throws IOException {
        return this.createDataBuffer(bufferPoolSize, bufferSize, numSubpartitions, null);
    }

    private DataBuffer createDataBuffer(int bufferPoolSize, int bufferSize, int numSubpartitions, int[] customReadOrder) throws IOException {
        NetworkBufferPool globalPool = new NetworkBufferPool(bufferPoolSize, bufferSize);
        BufferPool bufferPool = globalPool.createBufferPool(bufferPoolSize, bufferPoolSize);
        if (this.useHashBuffer) {
            return new HashBasedDataBuffer(bufferPool, numSubpartitions, bufferPoolSize, customReadOrder);
        }
        return new SortBasedDataBuffer(bufferPool, numSubpartitions, bufferSize, bufferPoolSize, customReadOrder);
    }

    public static int[] getRandomSubpartitionOrder(int numSubpartitions) {
        Random random = new Random(1111L);
        int[] subpartitionReadOrder = new int[numSubpartitions];
        int shift = random.nextInt(numSubpartitions);
        for (int i = 0; i < numSubpartitions; ++i) {
            subpartitionReadOrder[i] = (i + shift) % numSubpartitions;
        }
        return subpartitionReadOrder;
    }

    public static class DataAndType {
        private final ByteBuffer data;
        private final Buffer.DataType dataType;

        DataAndType(ByteBuffer data, Buffer.DataType dataType) {
            this.data = data;
            this.dataType = dataType;
        }
    }
}

