/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.fs;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.paimon.fs.FileIOUtils;
import org.apache.paimon.fs.FileRange;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.VectoredReadable;
import org.apache.paimon.utils.BlockingExecutor;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

public class VectoredReadUtils {
    public static void readVectored(VectoredReadable readable, List<? extends FileRange> ranges) throws IOException {
        if (ranges.isEmpty()) {
            return;
        }
        List<? extends FileRange> sortRanges = VectoredReadUtils.validateAndSortRanges(ranges);
        List<CombinedRange> combinedRanges = VectoredReadUtils.mergeSortedRanges(sortRanges, readable.minSeekForVectorReads());
        int parallelism = readable.parallelismForVectorReads();
        if (combinedRanges.size() == 1 && readable instanceof SeekableInputStream) {
            VectoredReadUtils.fallbackToReadSequence((SeekableInputStream)((Object)readable), sortRanges);
            return;
        }
        BlockingExecutor executor = new BlockingExecutor(FileIOUtils.IO_THREAD_POOL, parallelism);
        long batchSize = readable.batchSizeForVectorReads();
        for (CombinedRange combinedRange : combinedRanges) {
            if (combinedRange.underlying.size() == 1) {
                FileRange fileRange = (FileRange)combinedRange.underlying.get(0);
                executor.submit(() -> VectoredReadUtils.readSingleRange(readable, fileRange));
                continue;
            }
            List splitBatches = combinedRange.splitBatches(batchSize, parallelism);
            splitBatches.forEach(range -> executor.submit(() -> VectoredReadUtils.readSingleRange(readable, range)));
            List<CompletableFuture> futures = splitBatches.stream().map(FileRange::getData).collect(Collectors.toList());
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenAcceptAsync(unused -> VectoredReadUtils.copyToFileRanges(combinedRange, futures), (Executor)FileIOUtils.IO_THREAD_POOL);
        }
    }

    private static void fallbackToReadSequence(SeekableInputStream in, List<? extends FileRange> ranges) throws IOException {
        for (FileRange fileRange : ranges) {
            byte[] bytes = new byte[fileRange.getLength()];
            in.seek(fileRange.getOffset());
            IOUtils.readFully((InputStream)in, bytes);
            fileRange.getData().complete(bytes);
        }
    }

    private static void readSingleRange(VectoredReadable readable, FileRange range) {
        if (range.getLength() == 0) {
            range.getData().complete(new byte[0]);
            return;
        }
        try {
            long position = range.getOffset();
            int length = range.getLength();
            byte[] buffer = new byte[length];
            readable.preadFully(position, buffer, 0, length);
            range.getData().complete(buffer);
        }
        catch (Exception ex) {
            range.getData().completeExceptionally(ex);
        }
    }

    private static void copyToFileRanges(CombinedRange combinedRange, List<CompletableFuture<byte[]>> futures) {
        ArrayList<byte[]> segments = new ArrayList<byte[]>(futures.size());
        for (CompletableFuture<byte[]> future : futures) {
            segments.add(future.join());
        }
        long offset = combinedRange.offset;
        for (FileRange fileRange : combinedRange.underlying) {
            byte[] buffer = new byte[fileRange.getLength()];
            VectoredReadUtils.copyMultiBytesToBytes(segments, (int)(fileRange.getOffset() - offset), buffer, fileRange.getLength());
            fileRange.getData().complete(buffer);
        }
    }

    private static void copyMultiBytesToBytes(List<byte[]> segments, int offset, byte[] bytes, int numBytes) {
        int remainSize = numBytes;
        for (byte[] segment : segments) {
            int remain = segment.length - offset;
            if (remain > 0) {
                int nCopy = Math.min(remain, remainSize);
                System.arraycopy(segment, offset, bytes, numBytes - remainSize, nCopy);
                offset = 0;
                if ((remainSize -= nCopy) != 0) continue;
                return;
            }
            offset = -remain;
        }
    }

    private static List<? extends FileRange> validateAndSortRanges(List<? extends FileRange> input) throws EOFException {
        List<? extends FileRange> sortedRanges;
        Objects.requireNonNull(input, "Null input list");
        Preconditions.checkArgument(!input.isEmpty(), "Empty input list");
        if (input.size() == 1) {
            VectoredReadUtils.validateRangeRequest(input.get(0));
            sortedRanges = input;
        } else {
            sortedRanges = VectoredReadUtils.sortRanges(input);
            FileRange prev = null;
            for (FileRange fileRange : sortedRanges) {
                VectoredReadUtils.validateRangeRequest(fileRange);
                if (prev != null) {
                    Preconditions.checkArgument(fileRange.getOffset() >= prev.getOffset() + (long)prev.getLength(), "Overlapping ranges %s and %s", prev, fileRange);
                }
                prev = fileRange;
            }
        }
        return sortedRanges;
    }

    private static void validateRangeRequest(FileRange range) throws EOFException {
        Objects.requireNonNull(range, "range is null");
        Preconditions.checkArgument(range.getLength() >= 0, "length is negative in %s", range);
        if (range.getOffset() < 0L) {
            throw new EOFException("position is negative in range " + range);
        }
    }

    private static List<? extends FileRange> sortRanges(List<? extends FileRange> input) {
        ArrayList<? extends FileRange> ret = new ArrayList<FileRange>(input);
        ret.sort(Comparator.comparingLong(FileRange::getOffset));
        return ret;
    }

    private static List<CombinedRange> mergeSortedRanges(List<? extends FileRange> sortedRanges, int minimumSeek) {
        CombinedRange current = null;
        ArrayList<CombinedRange> result = new ArrayList<CombinedRange>(sortedRanges.size());
        for (FileRange fileRange : sortedRanges) {
            long start = fileRange.getOffset();
            long end = fileRange.getOffset() + (long)fileRange.getLength();
            if (current != null && current.merge(start, end, fileRange, minimumSeek)) continue;
            current = new CombinedRange(start, end, fileRange);
            result.add(current);
        }
        return result;
    }

    private static class CombinedRange {
        private final List<FileRange> underlying = new ArrayList<FileRange>();
        private final long offset;
        private int length;
        private long dataSize;

        public CombinedRange(long offset, long end, FileRange original) {
            this.offset = offset;
            this.length = (int)(end - offset);
            this.append(original);
        }

        private void append(FileRange range) {
            this.underlying.add(range);
            this.dataSize += (long)range.getLength();
        }

        public boolean merge(long otherOffset, long otherEnd, FileRange other, int minSeek) {
            long end = this.offset + (long)this.length;
            long newEnd = Math.max(end, otherEnd);
            if (otherOffset - end >= (long)minSeek) {
                return false;
            }
            this.length = (int)(newEnd - this.offset);
            this.append(other);
            return true;
        }

        private List<FileRange> splitBatches(long batchSize, int parallelism) {
            long expectedSize = Math.max(batchSize, (long)(this.length / parallelism + 1));
            ArrayList<FileRange> splitBatches = new ArrayList<FileRange>();
            long offset = this.offset;
            long end = offset + (long)this.length;
            long minRemain = Math.max(expectedSize, batchSize * 2L);
            while (true) {
                if (end < offset + minRemain) {
                    int currentLen = (int)(end - offset);
                    if (currentLen <= 0) break;
                    splitBatches.add(FileRange.createFileRange(offset, currentLen));
                    break;
                }
                splitBatches.add(FileRange.createFileRange(offset, (int)expectedSize));
                offset += expectedSize;
            }
            return splitBatches;
        }

        public String toString() {
            return String.format("CombinedRange: range count=%d, data size=%,d", this.underlying.size(), this.dataSize);
        }
    }
}

