/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.DataBufferTest;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class PartitionedFileWriteReadTest {
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testWriteAndReadPartitionedFile() throws Exception {
        int subpartition;
        int numSubpartitions = 10;
        int bufferSize = 1024;
        int numBuffers = 1000;
        int numRegions = 10;
        Random random = new Random(1111L);
        List[] buffersWritten = new List[numSubpartitions];
        List[] buffersRead = new List[numSubpartitions];
        List[] regionBuffers = new List[numSubpartitions];
        for (int subpartition2 = 0; subpartition2 < numSubpartitions; ++subpartition2) {
            buffersWritten[subpartition2] = new ArrayList();
            buffersRead[subpartition2] = new ArrayList();
            regionBuffers[subpartition2] = new ArrayList();
        }
        PartitionedFileWriter fileWriter = this.createPartitionedFileWriter(numSubpartitions);
        for (int region = 0; region < numRegions; ++region) {
            int index;
            int subpartition3;
            boolean isBroadcastRegion = random.nextBoolean();
            fileWriter.startNewRegion(isBroadcastRegion);
            for (int i = 0; i < numBuffers; ++i) {
                Buffer buffer = this.createBuffer(random, bufferSize);
                if (isBroadcastRegion) {
                    for (subpartition3 = 0; subpartition3 < numSubpartitions; ++subpartition3) {
                        buffersWritten[subpartition3].add(buffer);
                        regionBuffers[subpartition3].add(new BufferWithChannel(buffer, subpartition3));
                    }
                    continue;
                }
                subpartition3 = random.nextInt(numSubpartitions);
                buffersWritten[subpartition3].add(buffer);
                regionBuffers[subpartition3].add(new BufferWithChannel(buffer, subpartition3));
            }
            int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
            for (index = 0; index < numSubpartitions; ++index) {
                subpartition3 = writeOrder[index];
                fileWriter.writeBuffers(regionBuffers[subpartition3]);
                if (isBroadcastRegion) break;
            }
            for (index = 0; index < numSubpartitions; ++index) {
                regionBuffers[index].clear();
            }
        }
        PartitionedFile partitionedFile = fileWriter.finish();
        FileChannel dataFileChannel = this.openFileChannel(partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(partitionedFile.getIndexFilePath());
        for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            PartitionedFileReader fileReader = new PartitionedFileReader(partitionedFile, subpartition, dataFileChannel, indexFileChannel);
            while (fileReader.hasRemaining()) {
                MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
                Buffer buffer = fileReader.readCurrentRegion(readBuffer, buf -> {});
                buffersRead[subpartition].add(buffer);
            }
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
        for (subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            Assert.assertEquals((long)buffersWritten[subpartition].size(), (long)buffersRead[subpartition].size());
            for (int i = 0; i < buffersWritten[subpartition].size(); ++i) {
                this.assertBufferEquals((Buffer)buffersWritten[subpartition].get(i), (Buffer)buffersRead[subpartition].get(i));
            }
        }
    }

    @Test
    public void testWriteAndReadWithEmptySubpartition() throws Exception {
        int numRegions = 10;
        int numSubpartitions = 5;
        int bufferSize = 1024;
        Random random = new Random(1111L);
        ArrayDeque[] subpartitionBuffers = new ArrayDeque[numSubpartitions];
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            subpartitionBuffers[subpartition] = new ArrayDeque();
        }
        PartitionedFileWriter fileWriter = this.createPartitionedFileWriter(numSubpartitions);
        for (int region = 0; region < numRegions; ++region) {
            fileWriter.startNewRegion(false);
            for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
                if (!random.nextBoolean()) continue;
                Buffer buffer = this.createBuffer(random, bufferSize);
                subpartitionBuffers[subpartition].add(buffer);
                fileWriter.writeBuffers(this.getBufferWithChannels(buffer, subpartition));
            }
        }
        PartitionedFile partitionedFile = fileWriter.finish();
        FileChannel dataFileChannel = this.openFileChannel(partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(partitionedFile.getIndexFilePath());
        for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
            PartitionedFileReader fileReader = new PartitionedFileReader(partitionedFile, subpartition, dataFileChannel, indexFileChannel);
            while (fileReader.hasRemaining()) {
                MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment((int)bufferSize);
                Buffer buffer = (Buffer)Preconditions.checkNotNull((Object)fileReader.readCurrentRegion(readBuffer, buf -> {}));
                this.assertBufferEquals((Buffer)Preconditions.checkNotNull(subpartitionBuffers[subpartition].poll()), buffer);
            }
            Assert.assertTrue((boolean)subpartitionBuffers[subpartition].isEmpty());
        }
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    private void assertBufferEquals(Buffer expected, Buffer actual) {
        Assert.assertEquals((Object)expected.getDataType(), (Object)actual.getDataType());
        Assert.assertEquals((Object)expected.getNioBufferReadable(), (Object)actual.getNioBufferReadable());
    }

    private Buffer createBuffer(Random random, int bufferSize) {
        boolean isBuffer = random.nextBoolean();
        Buffer.DataType dataType = isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
        int dataSize = random.nextInt(bufferSize) + 1;
        byte[] data = new byte[dataSize];
        return new NetworkBuffer(MemorySegmentFactory.wrap((byte[])data), buf -> {}, dataType, dataSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IllegalStateException.class)
    public void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(2);
        try {
            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
            NetworkBuffer buffer1 = new NetworkBuffer(segment, buf -> {});
            partitionedFileWriter.writeBuffers(this.getBufferWithChannels((Buffer)buffer1, 1));
            NetworkBuffer buffer2 = new NetworkBuffer(segment, buf -> {});
            partitionedFileWriter.writeBuffers(this.getBufferWithChannels((Buffer)buffer2, 0));
            NetworkBuffer buffer3 = new NetworkBuffer(segment, buf -> {});
            partitionedFileWriter.writeBuffers(this.getBufferWithChannels((Buffer)buffer3, 1));
        }
        finally {
            partitionedFileWriter.finish();
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testWriteFinishedPartitionedFile() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createAndFinishPartitionedFileWriter();
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
        NetworkBuffer buffer = new NetworkBuffer(segment, buf -> {});
        partitionedFileWriter.writeBuffers(this.getBufferWithChannels((Buffer)buffer, 0));
    }

    @Test(expected=IllegalStateException.class)
    public void testFinishPartitionedFileWriterTwice() throws Exception {
        PartitionedFileWriter partitionedFileWriter = this.createAndFinishPartitionedFileWriter();
        partitionedFileWriter.finish();
    }

    @Test
    public void testReadEmptyPartitionedFile() throws Exception {
        PartitionedFile partitionedFile = this.createPartitionedFile();
        FileChannel dataFileChannel = this.openFileChannel(partitionedFile.getDataFilePath());
        FileChannel indexFileChannel = this.openFileChannel(partitionedFile.getIndexFilePath());
        PartitionedFileReader partitionedFileReader = new PartitionedFileReader(partitionedFile, 1, dataFileChannel, indexFileChannel);
        MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
        Assert.assertNull((Object)partitionedFileReader.readCurrentRegion(target, FreeingBufferRecycler.INSTANCE));
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{dataFileChannel, indexFileChannel});
    }

    private FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private List<BufferWithChannel> getBufferWithChannels(Buffer buffer, int channelIndex) {
        return Collections.singletonList(new BufferWithChannel(buffer, channelIndex));
    }

    private PartitionedFile createPartitionedFile() throws IOException {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(2);
        return partitionedFileWriter.finish();
    }

    private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions) throws IOException {
        String basePath = this.temporaryFolder.newFile().getPath();
        return new PartitionedFileWriter(numSubpartitions, 640, basePath);
    }

    private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
        PartitionedFileWriter partitionedFileWriter = this.createPartitionedFileWriter(1);
        partitionedFileWriter.finish();
        return partitionedFileWriter;
    }
}

