/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.mergetree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.CachelessSegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.SortBuffer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.OffsetRow;

public class MergeSorter {
    private final RowType keyType;
    private RowType valueType;
    private final CoreOptions.SortEngine sortEngine;
    private final int spillThreshold;
    private final int spillSortMaxNumFiles;
    private final String compression;
    private final MemorySize maxDiskSize;
    private final MemorySegmentPool memoryPool;
    @Nullable
    private IOManager ioManager;

    public MergeSorter(CoreOptions options, RowType keyType, RowType valueType, @Nullable IOManager ioManager) {
        this.sortEngine = options.sortEngine();
        this.spillThreshold = options.sortSpillThreshold();
        this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
        this.compression = options.spillCompression();
        this.keyType = keyType;
        this.valueType = valueType;
        this.memoryPool = new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize());
        this.ioManager = ioManager;
        this.maxDiskSize = options.writeBufferSpillDiskSize();
    }

    public MemorySegmentPool memoryPool() {
        return this.memoryPool;
    }

    public void setIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
    }

    public void setProjectedValueType(RowType projectedType) {
        this.valueType = projectedType;
    }

    public <T> RecordReader<T> mergeSort(List<ConcatRecordReader.ReaderSupplier<KeyValue>> lazyReaders, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
        if (this.ioManager != null && lazyReaders.size() > this.spillThreshold) {
            return this.spillMergeSort(lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction);
        }
        ArrayList<RecordReader<KeyValue>> readers = new ArrayList<RecordReader<KeyValue>>(lazyReaders.size());
        for (ConcatRecordReader.ReaderSupplier<KeyValue> supplier : lazyReaders) {
            try {
                readers.add(supplier.get());
            }
            catch (IOException e) {
                readers.forEach(IOUtils::closeQuietly);
                throw e;
            }
        }
        return SortMergeReader.createSortMergeReader(readers, keyComparator, userDefinedSeqComparator, mergeFunction, this.sortEngine);
    }

    private <T> RecordReader<T> spillMergeSort(List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
        final ExternalSorterWithLevel sorter = new ExternalSorterWithLevel(userDefinedSeqComparator);
        ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put);
        sorter.flushMemory();
        final NoReusingMergeIterator<T> iterator = sorter.newIterator(keyComparator, mergeFunction);
        return new RecordReader<T>(){
            private boolean read = false;

            @Override
            @Nullable
            public RecordReader.RecordIterator<T> readBatch() {
                if (this.read) {
                    return null;
                }
                this.read = true;
                return new RecordReader.RecordIterator<T>(){

                    @Override
                    public T next() throws IOException {
                        return iterator.next();
                    }

                    @Override
                    public void releaseBatch() {
                    }
                };
            }

            @Override
            public void close() {
                sorter.clear();
            }
        };
    }

    private class NoReusingMergeIterator<T> {
        private final MutableObjectIterator<BinaryRow> kvIter;
        private final Comparator<InternalRow> keyComparator;
        private final MergeFunctionWrapper<T> mergeFunc;
        private KeyValue left;
        private boolean isEnd;

        private NoReusingMergeIterator(MutableObjectIterator<BinaryRow> kvIter, Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T> mergeFunction) {
            this.kvIter = kvIter;
            this.keyComparator = keyComparator;
            this.mergeFunc = mergeFunction;
            this.isEnd = false;
        }

        public T next() throws IOException {
            T result;
            if (this.isEnd) {
                return null;
            }
            do {
                KeyValue keyValue;
                this.mergeFunc.reset();
                InternalRow key = null;
                while ((keyValue = this.readOnce()) != null && (key == null || this.keyComparator.compare(keyValue.key(), key) == 0)) {
                    key = keyValue.key();
                    this.mergeFunc.add(keyValue);
                }
                this.left = keyValue;
                if (key != null) continue;
                return null;
            } while ((result = this.mergeFunc.getResult()) == null);
            return result;
        }

        private KeyValue readOnce() throws IOException {
            if (this.left != null) {
                KeyValue ret = this.left;
                this.left = null;
                return ret;
            }
            BinaryRow row = this.kvIter.next();
            if (row == null) {
                this.isEnd = true;
                return null;
            }
            int keyArity = MergeSorter.this.keyType.getFieldCount();
            int valueArity = MergeSorter.this.valueType.getFieldCount();
            return new KeyValue().replace(new OffsetRow(keyArity, 0).replace(row), row.getLong(keyArity), RowKind.fromByteValue(row.getByte(keyArity + 1)), new OffsetRow(valueArity, keyArity + 3).replace(row)).setLevel(row.getInt(keyArity + 2));
        }
    }

    private class ExternalSorterWithLevel {
        private final SortBuffer buffer;

        public ExternalSorterWithLevel(FieldsComparator userDefinedSeqComparator) {
            if (MergeSorter.this.memoryPool.freePages() < 3) {
                throw new IllegalArgumentException("Write buffer requires a minimum of 3 page memory, please increase write buffer memory size.");
            }
            IntStream sortFields = IntStream.range(0, MergeSorter.this.keyType.getFieldCount());
            if (userDefinedSeqComparator != null) {
                IntStream udsFields = IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + MergeSorter.this.keyType.getFieldCount() + 3);
                sortFields = IntStream.concat(sortFields, udsFields);
            }
            sortFields = IntStream.concat(sortFields, IntStream.of(MergeSorter.this.keyType.getFieldCount()));
            ArrayList<DataField> fields = new ArrayList<DataField>(MergeSorter.this.keyType.getFields());
            fields.add(new DataField(0, "_SEQUENCE_NUMBER", new BigIntType(false)));
            fields.add(new DataField(1, "_VALUE_KIND", new TinyIntType(false)));
            fields.add(new DataField(2, "_LEVEL", new IntType(false)));
            fields.addAll(MergeSorter.this.valueType.getFields());
            this.buffer = BinaryExternalSortBuffer.create(MergeSorter.this.ioManager, new RowType(fields), sortFields.toArray(), MergeSorter.this.memoryPool, MergeSorter.this.spillSortMaxNumFiles, MergeSorter.this.compression, MergeSorter.this.maxDiskSize);
        }

        public boolean put(KeyValue keyValue) throws IOException {
            GenericRow meta = new GenericRow(3);
            meta.setField(0, keyValue.sequenceNumber());
            meta.setField(1, keyValue.valueKind().toByteValue());
            meta.setField(2, keyValue.level());
            JoinedRow row = new JoinedRow().replace(new JoinedRow().replace(keyValue.key(), meta), keyValue.value());
            return this.buffer.write(row);
        }

        public boolean flushMemory() throws IOException {
            return this.buffer.flushMemory();
        }

        public void clear() {
            this.buffer.clear();
        }

        public <T> NoReusingMergeIterator<T> newIterator(Comparator<InternalRow> keyComparator, MergeFunctionWrapper<T> mergeFunction) throws IOException {
            return new NoReusingMergeIterator(this.buffer.sortedIterator(), keyComparator, mergeFunction);
        }
    }
}

