/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.mergetree.DropDeleteReader;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.operation.DiffReader;
import org.apache.paimon.operation.ReverseReader;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;

public class MergeFileSplitRead
implements SplitRead<KeyValue> {
    private final TableSchema tableSchema;
    private final FileIO fileIO;
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final Comparator<InternalRow> keyComparator;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final MergeSorter mergeSorter;
    private final List<String> sequenceFields;
    @Nullable
    private int[][] keyProjectedFields;
    @Nullable
    private List<Predicate> filtersForKeys;
    @Nullable
    private List<Predicate> filtersForAll;
    @Nullable
    private int[][] pushdownProjection;
    @Nullable
    private int[][] outerProjection;
    private boolean forceKeepDelete = false;

    public MergeFileSplitRead(CoreOptions options, TableSchema schema, RowType keyType, RowType valueType, Comparator<InternalRow> keyComparator, MergeFunctionFactory<KeyValue> mfFactory, KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
        this.tableSchema = schema;
        this.readerFactoryBuilder = readerFactoryBuilder;
        this.fileIO = readerFactoryBuilder.fileIO();
        this.keyComparator = keyComparator;
        this.mfFactory = mfFactory;
        this.mergeSorter = new MergeSorter(CoreOptions.fromMap(this.tableSchema.options()), keyType, valueType, null);
        this.sequenceFields = options.sequenceField();
    }

    public MergeFileSplitRead withKeyProjection(@Nullable int[][] projectedFields) {
        this.readerFactoryBuilder.withKeyProjection(projectedFields);
        this.keyProjectedFields = projectedFields;
        return this;
    }

    public MergeFileSplitRead withValueProjection(@Nullable int[][] projectedFields) {
        if (projectedFields == null) {
            return this;
        }
        int[][] newProjectedFields = projectedFields;
        if (this.sequenceFields.size() > 0) {
            List<String> fieldNames = this.tableSchema.fieldNames();
            List<String> projectedNames = Projection.of(projectedFields).project(fieldNames);
            int[] lackFields = this.sequenceFields.stream().filter(f -> !projectedNames.contains(f)).mapToInt(fieldNames::indexOf).toArray();
            if (lackFields.length > 0) {
                newProjectedFields = (int[][])Arrays.copyOf(projectedFields, projectedFields.length + lackFields.length);
                for (int i = 0; i < lackFields.length; ++i) {
                    newProjectedFields[projectedFields.length + i] = new int[]{lackFields[i]};
                }
            }
        }
        MergeFunctionFactory.AdjustedProjection projection = this.mfFactory.adjustProjection(newProjectedFields);
        this.pushdownProjection = projection.pushdownProjection;
        this.outerProjection = projection.outerProjection;
        if (this.pushdownProjection != null) {
            this.readerFactoryBuilder.withValueProjection(this.pushdownProjection);
            this.mergeSorter.setProjectedValueType(this.readerFactoryBuilder.projectedValueType());
        }
        if (newProjectedFields != projectedFields) {
            this.outerProjection = this.outerProjection == null ? Projection.range(0, projectedFields.length).toNestedIndexes() : (int[][])Arrays.copyOf(this.outerProjection, projectedFields.length);
        }
        return this;
    }

    public MergeFileSplitRead withIOManager(IOManager ioManager) {
        this.mergeSorter.setIOManager(ioManager);
        return this;
    }

    public MergeFileSplitRead forceKeepDelete() {
        this.forceKeepDelete = true;
        return this;
    }

    public MergeFileSplitRead withFilter(Predicate predicate) {
        if (predicate == null) {
            return this;
        }
        ArrayList<Predicate> allFilters = new ArrayList<Predicate>();
        ArrayList<Predicate> pkFilters = null;
        List<String> primaryKeys = this.tableSchema.trimmedPrimaryKeys();
        Set<String> nonPrimaryKeys = this.tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());
        for (Predicate sub : PredicateBuilder.splitAnd(predicate)) {
            allFilters.add(sub);
            if (PredicateBuilder.containsFields(sub, nonPrimaryKeys)) continue;
            if (pkFilters == null) {
                pkFilters = new ArrayList<Predicate>();
            }
            pkFilters.add(sub);
        }
        this.filtersForAll = allFilters;
        this.filtersForKeys = pkFilters;
        return this;
    }

    @Override
    public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
        RecordReader<KeyValue> reader = this.createReaderWithoutOuterProjection(split);
        if (this.outerProjection != null) {
            ProjectedRow projectedRow = ProjectedRow.from(this.outerProjection);
            reader = reader.transform(kv -> kv.replaceValue(projectedRow.replaceRow(kv.value())));
        }
        return reader;
    }

    private RecordReader<KeyValue> createReaderWithoutOuterProjection(DataSplit split) throws IOException {
        if (split.beforeFiles().isEmpty()) {
            if (split.isStreaming() || split.convertToRawFiles().isPresent()) {
                return this.noMergeRead(split.partition(), split.bucket(), split.dataFiles(), split.deletionFiles().orElse(null), split.isStreaming());
            }
            return this.projectKey(this.mergeRead(split.partition(), split.bucket(), split.dataFiles(), null, this.forceKeepDelete));
        }
        if (split.isStreaming()) {
            return ConcatRecordReader.create(() -> new ReverseReader(this.noMergeRead(split.partition(), split.bucket(), split.beforeFiles(), split.beforeDeletionFiles().orElse(null), true)), () -> this.noMergeRead(split.partition(), split.bucket(), split.dataFiles(), split.deletionFiles().orElse(null), true));
        }
        return this.projectKey(DiffReader.readDiff(this.mergeRead(split.partition(), split.bucket(), split.beforeFiles(), split.beforeDeletionFiles().orElse(null), false), this.mergeRead(split.partition(), split.bucket(), split.dataFiles(), split.deletionFiles().orElse(null), false), this.keyComparator, this.createUdsComparator(), this.mergeSorter, this.forceKeepDelete));
    }

    private RecordReader<KeyValue> mergeRead(BinaryRow partition, int bucket, List<DataFileMeta> files, @Nullable List<DeletionFile> deletionFiles, boolean keepDelete) throws IOException {
        DeletionVector.Factory dvFactory = DeletionVector.factory(this.fileIO, files, deletionFiles);
        KeyValueFileReaderFactory overlappedSectionFactory = this.readerFactoryBuilder.build(partition, bucket, dvFactory, false, this.filtersForKeys);
        KeyValueFileReaderFactory nonOverlappedSectionFactory = this.readerFactoryBuilder.build(partition, bucket, dvFactory, false, this.filtersForAll);
        ArrayList sectionReaders = new ArrayList();
        ReducerMergeFunctionWrapper mergeFuncWrapper = new ReducerMergeFunctionWrapper(this.mfFactory.create(this.pushdownProjection));
        for (List<SortedRun> section : new IntervalPartition(files, this.keyComparator).partition()) {
            sectionReaders.add(() -> MergeTreeReaders.readerForSection(section, section.size() > 1 ? overlappedSectionFactory : nonOverlappedSectionFactory, this.keyComparator, this.createUdsComparator(), mergeFuncWrapper, this.mergeSorter));
        }
        DropDeleteReader reader = ConcatRecordReader.create(sectionReaders);
        if (!keepDelete) {
            reader = new DropDeleteReader(reader);
        }
        return reader;
    }

    private RecordReader<KeyValue> noMergeRead(BinaryRow partition, int bucket, List<DataFileMeta> files, @Nullable List<DeletionFile> deletionFiles, boolean onlyFilterKey) throws IOException {
        KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(partition, bucket, DeletionVector.factory(this.fileIO, files, deletionFiles), true, onlyFilterKey ? this.filtersForKeys : this.filtersForAll);
        ArrayList suppliers = new ArrayList();
        for (DataFileMeta file : files) {
            suppliers.add(() -> {
                String fileName = this.changelogFile(file).orElse(file.fileName());
                return readerFactory.createRecordReader(file.schemaId(), fileName, file.fileSize(), file.level());
            });
        }
        return ConcatRecordReader.create(suppliers);
    }

    private Optional<String> changelogFile(DataFileMeta fileMeta) {
        for (String file : fileMeta.extraFiles()) {
            if (!file.startsWith("changelog-")) continue;
            return Optional.of(file);
        }
        return Optional.empty();
    }

    private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader) {
        if (this.keyProjectedFields == null) {
            return reader;
        }
        ProjectedRow projectedRow = ProjectedRow.from(this.keyProjectedFields);
        return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key())));
    }

    @Nullable
    private UserDefinedSeqComparator createUdsComparator() {
        return UserDefinedSeqComparator.create(this.readerFactoryBuilder.projectedValueType(), this.sequenceFields);
    }
}

