/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.MemoryFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyValueFileStoreWrite
extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
    private final Supplier<FieldsComparator> udsComparatorSupplier;
    private final Supplier<RecordEqualiser> valueEqualiserSupplier;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final CoreOptions options;
    private final FileIO fileIO;
    private final RowType keyType;
    private final RowType valueType;
    @Nullable
    private final RecordLevelExpire recordLevelExpire;

    public KeyValueFileStoreWrite(FileIO fileIO, SchemaManager schemaManager, TableSchema schema, String commitUser, RowType keyType, RowType valueType, Supplier<Comparator<InternalRow>> keyComparatorSupplier, Supplier<FieldsComparator> udsComparatorSupplier, Supplier<RecordEqualiser> valueEqualiserSupplier, MergeFunctionFactory<KeyValue> mfFactory, FileStorePathFactory pathFactory, Map<String, FileStorePathFactory> format2PathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory<KeyValue> indexFactory, @Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory, CoreOptions options, KeyValueFieldsExtractor extractor, String tableName) {
        super(commitUser, snapshotManager, scan, options, indexFactory, deletionVectorsMaintainerFactory, tableName);
        this.fileIO = fileIO;
        this.keyType = keyType;
        this.valueType = valueType;
        this.udsComparatorSupplier = udsComparatorSupplier;
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(fileIO, schemaManager, schema, keyType, valueType, FileFormatDiscover.of(options), pathFactory, extractor, options);
        this.recordLevelExpire = RecordLevelExpire.create(options, valueType);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(fileIO, schema.id(), keyType, valueType, options.fileFormat(), format2PathFactory, options.targetFileSize());
        this.keyComparatorSupplier = keyComparatorSupplier;
        this.valueEqualiserSupplier = valueEqualiserSupplier;
        this.mfFactory = mfFactory;
        this.options = options;
    }

    protected MergeTreeWriter createWriter(BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, @Nullable DeletionVectorsMaintainer dvMaintainer) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating merge tree writer for partition {} bucket {} from restored files {}", new Object[]{partition, bucket, restoreFiles});
        }
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options);
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(keyComparator, restoreFiles, this.options.numLevels());
        UniversalCompaction universalCompaction = new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.optimizedCompactionInterval());
        CompactStrategy compactStrategy = this.options.needLookup() ? new ForceUpLevel0Compaction(universalCompaction) : universalCompaction;
        CompactManager compactManager = this.createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);
        return new MergeTreeWriter(this.bufferSpillable(), this.options.writeBufferSpillDiskSize(), this.options.localSortMaxNumFileHandles(), this.options.spillCompression(), this.ioManager, compactManager, restoredMaxSeqNumber, keyComparator, this.mfFactory.create(), writerFactory, this.options.commitForceCompact(), this.options.changelogProducer(), restoreIncrement, UserDefinedSeqComparator.create(this.valueType, this.options));
    }

    @VisibleForTesting
    public boolean bufferSpillable() {
        return this.options.writeBufferSpillable(this.fileIO.isObjectStore(), this.isStreamingMode);
    }

    private CompactManager createCompactManager(BinaryRow partition, int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels, @Nullable DeletionVectorsMaintainer dvMaintainer) {
        if (this.options.writeOnly()) {
            return new NoopCompactManager();
        }
        Comparator<InternalRow> keyComparator = this.keyComparatorSupplier.get();
        FieldsComparator userDefinedSeqComparator = this.udsComparatorSupplier.get();
        MergeTreeCompactRewriter rewriter = this.createRewriter(partition, bucket, keyComparator, userDefinedSeqComparator, levels, dvMaintainer);
        return new MergeTreeCompactManager(compactExecutor, levels, compactStrategy, keyComparator, this.options.compactionFileSize(), this.options.numSortedRunStopTrigger(), rewriter, this.compactionMetrics == null ? null : this.compactionMetrics.createReporter(partition, bucket), this.options.deletionVectorsEnabled());
    }

    private MergeTreeCompactRewriter createRewriter(BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, Levels levels, @Nullable DeletionVectorsMaintainer dvMaintainer) {
        DeletionVector.Factory dvFactory = DeletionVector.factory(dvMaintainer);
        FileReaderFactory<KeyValue> readerFactory = this.readerFactoryBuilder.build(partition, bucket, dvFactory);
        if (this.recordLevelExpire != null) {
            readerFactory = this.recordLevelExpire.wrap(readerFactory);
        }
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket, this.options);
        MergeSorter mergeSorter = new MergeSorter(this.options, this.keyType, this.valueType, this.ioManager);
        int maxLevel = this.options.numLevels() - 1;
        CoreOptions.MergeEngine mergeEngine = this.options.mergeEngine();
        CoreOptions.ChangelogProducer changelogProducer = this.options.changelogProducer();
        LookupStrategy lookupStrategy = this.options.lookupStrategy();
        if (changelogProducer.equals(CoreOptions.ChangelogProducer.FULL_COMPACTION)) {
            return new FullChangelogMergeTreeCompactRewriter(maxLevel, mergeEngine, readerFactory, writerFactory, keyComparator, userDefinedSeqComparator, this.mfFactory, mergeSorter, this.valueEqualiserSupplier.get(), this.options.changelogRowDeduplicate());
        }
        if (lookupStrategy.needLookup) {
            LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<Boolean> wrapperFactory;
            LookupLevels.ValueProcessor<LookupLevels.PositionedKeyValue> processor;
            FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
            if (lookupStrategy.isFirstRow) {
                if (this.options.deletionVectorsEnabled()) {
                    throw new UnsupportedOperationException("First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
                }
                lookupReaderFactory = this.readerFactoryBuilder.copyWithoutProjection().withValueProjection(new int[0][]).build(partition, bucket, dvFactory);
                processor = new LookupLevels.ContainsValueProcessor();
                wrapperFactory = new LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory();
            } else {
                processor = lookupStrategy.deletionVector ? new LookupLevels.PositionedKeyValueProcessor(this.valueType, lookupStrategy.produceChangelog || mergeEngine != CoreOptions.MergeEngine.DEDUPLICATE) : new LookupLevels.KeyValueProcessor(this.valueType);
                wrapperFactory = new LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory(this.valueEqualiserSupplier.get(), this.options.changelogRowDeduplicate(), lookupStrategy, UserDefinedSeqComparator.create(this.valueType, this.options));
            }
            return new LookupMergeTreeCompactRewriter<Boolean>(maxLevel, mergeEngine, this.createLookupLevels(levels, processor, lookupReaderFactory), readerFactory, writerFactory, keyComparator, userDefinedSeqComparator, this.mfFactory, mergeSorter, wrapperFactory, lookupStrategy.produceChangelog, dvMaintainer);
        }
        return new MergeTreeCompactRewriter(readerFactory, writerFactory, keyComparator, userDefinedSeqComparator, this.mfFactory, mergeSorter);
    }

    private <T> LookupLevels<T> createLookupLevels(Levels levels, LookupLevels.ValueProcessor<T> valueProcessor, FileReaderFactory<KeyValue> readerFactory) {
        if (this.ioManager == null) {
            throw new RuntimeException("Can not use lookup, there is no temp disk directory to use.");
        }
        Options options = this.options.toConfiguration();
        return new LookupLevels<T>(levels, this.keyComparatorSupplier.get(), this.keyType, valueProcessor, readerFactory::createRecordReader, () -> this.ioManager.createChannel().getPathFile(), new HashLookupStoreFactory(this.cacheManager, this.options.cachePageSize(), options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR).floatValue(), options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), LookupStoreFactory.bfGenerator(options));
    }
}

