/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.append;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
import org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

public class UnawareAppendTableCompactionCoordinator {
    private static final int FILES_BATCH = 100000;
    protected static final int REMOVE_AGE = 10;
    protected static final int COMPACT_AGE = 5;
    private final SnapshotManager snapshotManager;
    private final long targetFileSize;
    private final long compactionFileSize;
    private final int minFileNum;
    private final int maxFileNum;
    private final DvMaintainerCache dvMaintainerCache;
    private final FilesIterator filesIterator;
    final Map<BinaryRow, PartitionCompactCoordinator> partitionCompactCoordinators = new HashMap<BinaryRow, PartitionCompactCoordinator>();

    public UnawareAppendTableCompactionCoordinator(FileStoreTable table) {
        this(table, true);
    }

    public UnawareAppendTableCompactionCoordinator(FileStoreTable table, boolean isStreaming) {
        this(table, isStreaming, null);
    }

    public UnawareAppendTableCompactionCoordinator(FileStoreTable table, boolean isStreaming, @Nullable org.apache.paimon.predicate.Predicate filter) {
        Preconditions.checkArgument(table.primaryKeys().isEmpty());
        this.snapshotManager = table.snapshotManager();
        CoreOptions options = table.coreOptions();
        this.targetFileSize = options.targetFileSize(false);
        this.compactionFileSize = options.compactionFileSize(false);
        this.minFileNum = options.compactionMinFileNum();
        this.maxFileNum = options.compactionMaxFileNum().orElse(50);
        this.dvMaintainerCache = options.deletionVectorsEnabled() ? new DvMaintainerCache(table.store().newIndexFileHandler()) : null;
        this.filesIterator = new FilesIterator(table, isStreaming, filter);
    }

    public List<UnawareAppendCompactionTask> run() {
        if (this.scan()) {
            return this.compactPlan();
        }
        return Collections.emptyList();
    }

    @VisibleForTesting
    boolean scan() {
        HashMap<BinaryRow, List> files = new HashMap<BinaryRow, List>();
        for (int i = 0; i < 100000; ++i) {
            ManifestEntry entry;
            try {
                entry = this.filesIterator.next();
            }
            catch (EndOfScanException e) {
                if (!files.isEmpty()) {
                    files.forEach(this::notifyNewFiles);
                    return true;
                }
                throw e;
            }
            if (entry == null) break;
            BinaryRow partition = entry.partition();
            files.computeIfAbsent(partition, k -> new ArrayList()).add(entry.file());
        }
        if (files.isEmpty()) {
            return false;
        }
        files.forEach(this::notifyNewFiles);
        return true;
    }

    @VisibleForTesting
    FilesIterator filesIterator() {
        return this.filesIterator;
    }

    @VisibleForTesting
    void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
        Predicate<DataFileMeta> filter = file -> {
            if (this.dvMaintainerCache == null || this.dvMaintainerCache.dvMaintainer(partition).getDeletionFile(file.fileName()) == null) {
                return file.fileSize() < this.compactionFileSize;
            }
            return true;
        };
        List<DataFileMeta> toCompact = files.stream().filter(filter).collect(Collectors.toList());
        this.partitionCompactCoordinators.computeIfAbsent(partition, pp -> new PartitionCompactCoordinator(partition)).addFiles(toCompact);
    }

    @VisibleForTesting
    List<UnawareAppendCompactionTask> compactPlan() {
        List<UnawareAppendCompactionTask> tasks = this.partitionCompactCoordinators.values().stream().flatMap(s -> s.plan().stream()).collect(Collectors.toList());
        new ArrayList<PartitionCompactCoordinator>(this.partitionCompactCoordinators.values()).stream().filter(PartitionCompactCoordinator::readyToRemove).map(PartitionCompactCoordinator::partition).forEach(this.partitionCompactCoordinators::remove);
        return tasks;
    }

    @VisibleForTesting
    HashSet<DataFileMeta> listRestoredFiles() {
        HashSet<DataFileMeta> sets = new HashSet<DataFileMeta>();
        this.partitionCompactCoordinators.values().forEach(partitionCompactCoordinator -> sets.addAll(((PartitionCompactCoordinator)partitionCompactCoordinator).toCompact));
        return sets;
    }

    class FilesIterator {
        private final SnapshotReader snapshotReader;
        private final boolean streamingMode;
        @Nullable
        private Long nextSnapshot = null;
        @Nullable
        private Iterator<ManifestEntry> currentIterator;

        public FilesIterator(FileStoreTable table, @Nullable boolean isStreaming, org.apache.paimon.predicate.Predicate filter) {
            this.snapshotReader = table.newSnapshotReader();
            if (filter != null) {
                this.snapshotReader.withFilter(filter);
            }
            if (table.coreOptions().manifestDeleteFileDropStats()) {
                this.snapshotReader.dropStats();
            }
            this.streamingMode = isStreaming;
        }

        private void assignNewIterator() {
            this.currentIterator = null;
            if (this.nextSnapshot == null) {
                this.nextSnapshot = UnawareAppendTableCompactionCoordinator.this.snapshotManager.latestSnapshotId();
                if (this.nextSnapshot == null) {
                    if (!this.streamingMode) {
                        throw new EndOfScanException();
                    }
                    return;
                }
                this.snapshotReader.withMode(ScanMode.ALL);
            } else {
                if (!this.streamingMode) {
                    throw new EndOfScanException();
                }
                this.snapshotReader.withMode(ScanMode.DELTA);
            }
            if (!UnawareAppendTableCompactionCoordinator.this.snapshotManager.snapshotExists(this.nextSnapshot)) {
                return;
            }
            Snapshot snapshot = UnawareAppendTableCompactionCoordinator.this.snapshotManager.snapshot(this.nextSnapshot);
            Long l = this.nextSnapshot;
            Long l2 = this.nextSnapshot = Long.valueOf(this.nextSnapshot + 1L);
            if (UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache != null) {
                UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache.refresh();
            }
            Filter<ManifestEntry> entryFilter = entry -> {
                if (entry.file().fileSize() < UnawareAppendTableCompactionCoordinator.this.compactionFileSize) {
                    return true;
                }
                if (UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache != null) {
                    return UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache.dvMaintainer(entry.partition()).hasDeletionFile(entry.fileName());
                }
                return false;
            };
            this.currentIterator = this.snapshotReader.withManifestEntryFilter(entryFilter).withSnapshot(snapshot).readFileIterator();
        }

        @Nullable
        public ManifestEntry next() {
            while (true) {
                if (this.currentIterator == null) {
                    this.assignNewIterator();
                    if (this.currentIterator == null) {
                        return null;
                    }
                }
                if (this.currentIterator.hasNext()) {
                    ManifestEntry entry = this.currentIterator.next();
                    if (entry.kind() == FileKind.DELETE) continue;
                    return entry;
                }
                this.currentIterator = null;
            }
        }
    }

    private class DvMaintainerCache {
        private final IndexFileHandler indexFileHandler;
        private final Map<BinaryRow, UnawareAppendDeletionFileMaintainer> cache = new ConcurrentHashMap<BinaryRow, UnawareAppendDeletionFileMaintainer>();

        private DvMaintainerCache(IndexFileHandler indexFileHandler) {
            this.indexFileHandler = indexFileHandler;
        }

        private void refresh() {
            this.cache.clear();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow partition) {
            UnawareAppendDeletionFileMaintainer maintainer = this.cache.get(partition);
            if (maintainer == null) {
                DvMaintainerCache dvMaintainerCache = this;
                synchronized (dvMaintainerCache) {
                    maintainer = AppendDeletionFileMaintainer.forUnawareAppend(this.indexFileHandler, UnawareAppendTableCompactionCoordinator.this.snapshotManager.latestSnapshotId(), partition);
                }
                this.cache.put(partition, maintainer);
            }
            return maintainer;
        }
    }

    class PartitionCompactCoordinator {
        private final BinaryRow partition;
        private final HashSet<DataFileMeta> toCompact = new HashSet();
        int age = 0;

        public PartitionCompactCoordinator(BinaryRow partition) {
            this.partition = partition;
        }

        public List<UnawareAppendCompactionTask> plan() {
            return this.pickCompact();
        }

        public BinaryRow partition() {
            return this.partition;
        }

        private List<UnawareAppendCompactionTask> pickCompact() {
            List<List<DataFileMeta>> waitCompact = this.agePack();
            return waitCompact.stream().map(files -> new UnawareAppendCompactionTask(this.partition, (List<DataFileMeta>)files)).collect(Collectors.toList());
        }

        public void addFiles(List<DataFileMeta> dataFileMetas) {
            this.age = 0;
            this.toCompact.addAll(dataFileMetas);
        }

        public boolean readyToRemove() {
            return this.toCompact.isEmpty() || this.age > 10;
        }

        private List<List<DataFileMeta>> agePack() {
            List<List<DataFileMeta>> packed = UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache == null ? this.pack(this.toCompact) : this.packInDeletionVectorVMode(this.toCompact);
            if (packed.isEmpty() && ++this.age > 5 && this.toCompact.size() > 1) {
                ArrayList<DataFileMeta> all = new ArrayList<DataFileMeta>(this.toCompact);
                this.toCompact.clear();
                packed = Collections.singletonList(all);
            }
            return packed;
        }

        private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
            ArrayList<DataFileMeta> files = new ArrayList<DataFileMeta>(toCompact);
            files.sort(Comparator.comparingLong(DataFileMeta::fileSize));
            ArrayList<List<DataFileMeta>> result = new ArrayList<List<DataFileMeta>>();
            FileBin fileBin = new FileBin();
            for (DataFileMeta fileMeta : files) {
                fileBin.addFile(fileMeta);
                if (!fileBin.binReady()) continue;
                result.add(new ArrayList<DataFileMeta>(fileBin.bin));
                fileBin.reset();
            }
            return result;
        }

        private List<List<DataFileMeta>> packInDeletionVectorVMode(Set<DataFileMeta> toCompact) {
            HashMap<IndexFileMeta, List> filesWithDV = new HashMap<IndexFileMeta, List>();
            HashSet<DataFileMeta> rest = new HashSet<DataFileMeta>();
            for (DataFileMeta dataFile : toCompact) {
                IndexFileMeta indexFile = UnawareAppendTableCompactionCoordinator.this.dvMaintainerCache.dvMaintainer(this.partition).getIndexFile(dataFile.fileName());
                if (indexFile == null) {
                    rest.add(dataFile);
                    continue;
                }
                filesWithDV.computeIfAbsent(indexFile, f -> new ArrayList()).add(dataFile);
            }
            ArrayList<List<DataFileMeta>> result = new ArrayList<List<DataFileMeta>>(filesWithDV.values());
            if (rest.size() > 1) {
                result.addAll(this.pack(rest));
            }
            return result;
        }

        private class FileBin {
            List<DataFileMeta> bin = new ArrayList<DataFileMeta>();
            long totalFileSize = 0L;
            int fileNum = 0;

            private FileBin() {
            }

            public void reset() {
                this.bin.forEach(PartitionCompactCoordinator.this.toCompact::remove);
                this.bin.clear();
                this.totalFileSize = 0L;
                this.fileNum = 0;
            }

            public void addFile(DataFileMeta file) {
                this.totalFileSize += file.fileSize();
                ++this.fileNum;
                this.bin.add(file);
            }

            public boolean binReady() {
                return this.totalFileSize >= UnawareAppendTableCompactionCoordinator.this.targetFileSize && this.fileNum >= UnawareAppendTableCompactionCoordinator.this.minFileNum || this.fileNum >= UnawareAppendTableCompactionCoordinator.this.maxFileNum;
            }
        }
    }
}

