/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.append;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Preconditions;

public class AppendOnlyTableCompactionCoordinator {
    protected static final int REMOVE_AGE = 10;
    protected static final int COMPACT_AGE = 5;
    private final InnerTableScan scan;
    private final long targetFileSize;
    private final long compactionFileSize;
    private final int minFileNum;
    private final int maxFileNum;
    private final boolean streamingMode;
    final Map<BinaryRow, PartitionCompactCoordinator> partitionCompactCoordinators = new HashMap<BinaryRow, PartitionCompactCoordinator>();

    public AppendOnlyTableCompactionCoordinator(FileStoreTable table) {
        this(table, true);
    }

    public AppendOnlyTableCompactionCoordinator(FileStoreTable table, boolean isStreaming) {
        this(table, isStreaming, null);
    }

    public AppendOnlyTableCompactionCoordinator(FileStoreTable table, boolean isStreaming, @Nullable Predicate filter) {
        Preconditions.checkArgument(table.primaryKeys().isEmpty());
        Table tableCopy = table.copy((Map)this.compactScanType());
        this.scan = isStreaming ? tableCopy.newStreamScan() : tableCopy.newScan();
        if (filter != null) {
            this.scan.withFilter(filter);
        }
        this.streamingMode = isStreaming;
        CoreOptions coreOptions = table.coreOptions();
        this.targetFileSize = coreOptions.targetFileSize();
        this.compactionFileSize = coreOptions.compactionFileSize();
        this.minFileNum = coreOptions.compactionMinFileNum();
        this.maxFileNum = coreOptions.compactionMaxFileNum();
    }

    public List<AppendOnlyCompactionTask> run() {
        if (this.scan()) {
            return this.compactPlan();
        }
        return Collections.emptyList();
    }

    @VisibleForTesting
    boolean scan() {
        List<Split> splits;
        boolean hasResult = false;
        while (!(splits = this.scan.plan().splits()).isEmpty()) {
            hasResult = true;
            splits.forEach(split -> {
                DataSplit dataSplit = (DataSplit)split;
                this.notifyNewFiles(dataSplit.partition(), dataSplit.dataFiles());
            });
            if (this.streamingMode) continue;
            break;
        }
        return hasResult;
    }

    @VisibleForTesting
    void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
        this.partitionCompactCoordinators.computeIfAbsent(partition, x$0 -> new PartitionCompactCoordinator((BinaryRow)x$0)).addFiles(files.stream().filter(file -> file.fileSize() < this.compactionFileSize).collect(Collectors.toList()));
    }

    @VisibleForTesting
    List<AppendOnlyCompactionTask> compactPlan() {
        List<AppendOnlyCompactionTask> tasks = this.partitionCompactCoordinators.values().stream().flatMap(s -> s.plan().stream()).collect(Collectors.toList());
        new ArrayList<PartitionCompactCoordinator>(this.partitionCompactCoordinators.values()).stream().filter(PartitionCompactCoordinator::readyToRemove).map(PartitionCompactCoordinator::partition).forEach(this.partitionCompactCoordinators::remove);
        return tasks;
    }

    @VisibleForTesting
    HashSet<DataFileMeta> listRestoredFiles() {
        HashSet<DataFileMeta> sets = new HashSet<DataFileMeta>();
        this.partitionCompactCoordinators.values().forEach(partitionCompactCoordinator -> sets.addAll(((PartitionCompactCoordinator)partitionCompactCoordinator).toCompact));
        return sets;
    }

    private Map<String, String> compactScanType() {
        return new HashMap<String, String>(){
            {
                this.put(CoreOptions.STREAM_SCAN_MODE.key(), CoreOptions.StreamScanMode.COMPACT_APPEND_NO_BUCKET.getValue());
            }
        };
    }

    class PartitionCompactCoordinator {
        private final BinaryRow partition;
        private final HashSet<DataFileMeta> toCompact = new HashSet();
        int age = 0;

        public PartitionCompactCoordinator(BinaryRow partition) {
            this.partition = partition;
        }

        public List<AppendOnlyCompactionTask> plan() {
            return this.pickCompact();
        }

        public BinaryRow partition() {
            return this.partition;
        }

        private List<AppendOnlyCompactionTask> pickCompact() {
            List<List<DataFileMeta>> waitCompact = this.agePack();
            return waitCompact.stream().map(files -> new AppendOnlyCompactionTask(this.partition, (List<DataFileMeta>)files)).collect(Collectors.toList());
        }

        public void addFiles(List<DataFileMeta> dataFileMetas) {
            this.age = 0;
            this.toCompact.addAll(dataFileMetas);
        }

        public boolean readyToRemove() {
            return this.toCompact.isEmpty() || this.age > 10;
        }

        private List<List<DataFileMeta>> agePack() {
            List<List<DataFileMeta>> packed = this.pack();
            if (packed.isEmpty() && ++this.age > 5 && this.toCompact.size() > 1) {
                ArrayList<DataFileMeta> all = new ArrayList<DataFileMeta>(this.toCompact);
                this.toCompact.clear();
                packed = Collections.singletonList(all);
            }
            return packed;
        }

        private List<List<DataFileMeta>> pack() {
            ArrayList<DataFileMeta> files = new ArrayList<DataFileMeta>(this.toCompact);
            files.sort(Comparator.comparingLong(DataFileMeta::fileSize));
            ArrayList<List<DataFileMeta>> result = new ArrayList<List<DataFileMeta>>();
            FileBin fileBin = new FileBin();
            for (DataFileMeta fileMeta : files) {
                fileBin.addFile(fileMeta);
                if (!fileBin.binReady()) continue;
                result.add(new ArrayList<DataFileMeta>(fileBin.bin));
                fileBin.reset();
            }
            return result;
        }

        private class FileBin {
            List<DataFileMeta> bin = new ArrayList<DataFileMeta>();
            long totalFileSize = 0L;
            int fileNum = 0;

            private FileBin() {
            }

            public void reset() {
                this.bin.forEach(PartitionCompactCoordinator.this.toCompact::remove);
                this.bin.clear();
                this.totalFileSize = 0L;
                this.fileNum = 0;
            }

            public void addFile(DataFileMeta file) {
                this.totalFileSize += file.fileSize();
                ++this.fileNum;
                this.bin.add(file);
            }

            public boolean binReady() {
                return this.totalFileSize >= AppendOnlyTableCompactionCoordinator.this.targetFileSize && this.fileNum >= AppendOnlyTableCompactionCoordinator.this.minFileNum || this.fileNum >= AppendOnlyTableCompactionCoordinator.this.maxFileNum;
            }
        }
    }
}

