/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.iceberg.manifest.IcebergConversions;
import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestList;
import org.apache.paimon.iceberg.manifest.IcebergPartitionSummary;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
    private static final String VERSION_HINT_FILENAME = "version-hint.text";
    static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM = ConfigOptions.key("metadata.iceberg.compaction.min.file-num").intType().defaultValue(10);
    static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM = ConfigOptions.key("metadata.iceberg.compaction.max.file-num").intType().defaultValue(50);
    protected final FileStoreTable table;
    private final String commitUser;
    private final IcebergPathFactory pathFactory;
    private final FileStorePathFactory fileStorePathFactory;
    private final IcebergManifestFile manifestFile;
    private final IcebergManifestList manifestList;

    public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
        this.table = table;
        this.commitUser = commitUser;
        this.pathFactory = new IcebergPathFactory(table.location());
        this.fileStorePathFactory = table.store().pathFactory();
        this.manifestFile = IcebergManifestFile.create(table, this.pathFactory);
        this.manifestList = IcebergManifestList.create(table, this.pathFactory);
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
        this.createMetadata(snapshot.id(), (removedFiles, addedFiles) -> this.collectFileChanges(committedEntries, (Map<String, BinaryRow>)removedFiles, (Map<String, Pair<BinaryRow, DataFileMeta>>)addedFiles));
    }

    @Override
    public void retry(ManifestCommittable committable) {
        SnapshotManager snapshotManager = this.table.snapshotManager();
        long snapshotId = snapshotManager.findSnapshotsForIdentifiers(this.commitUser, Collections.singletonList(committable.identifier())).stream().mapToLong(Snapshot::id).max().orElseThrow(() -> new RuntimeException("There is no snapshot for commit user " + this.commitUser + " and identifier " + committable.identifier() + ". This is unexpected."));
        this.createMetadata(snapshotId, (removedFiles, addedFiles) -> this.collectFileChanges(snapshotId, (Map<String, BinaryRow>)removedFiles, (Map<String, Pair<BinaryRow, DataFileMeta>>)addedFiles));
    }

    private void createMetadata(long snapshotId, FileChangesCollector fileChangesCollector) {
        try {
            if (this.table.fileIO().exists(this.pathFactory.toMetadataPath(snapshotId))) {
                return;
            }
            Path baseMetadataPath = this.pathFactory.toMetadataPath(snapshotId - 1L);
            if (this.table.fileIO().exists(baseMetadataPath)) {
                this.createMetadataWithBase(fileChangesCollector, snapshotId, baseMetadataPath);
            } else {
                this.createMetadataWithoutBase(snapshotId);
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void createMetadataWithoutBase(long snapshotId) throws IOException {
        SnapshotReader snapshotReader = this.table.newSnapshotReader().withSnapshot(snapshotId);
        SchemaCache schemas = new SchemaCache();
        Iterator<IcebergManifestEntry> entryIterator = snapshotReader.read().dataSplits().stream().filter(DataSplit::rawConvertible).flatMap(s -> this.dataSplitToManifestEntries((DataSplit)s, snapshotId, schemas).stream()).iterator();
        List<IcebergManifestFileMeta> manifestFileMetas = this.manifestFile.rollingWrite(entryIterator, snapshotId);
        String manifestListFileName = this.manifestList.writeWithoutRolling(manifestFileMetas);
        List<IcebergPartitionField> partitionFields = this.getPartitionFields(this.table.schema().logicalPartitionType());
        int schemaId = (int)this.table.schema().id();
        IcebergSnapshot snapshot = new IcebergSnapshot(snapshotId, snapshotId, System.currentTimeMillis(), IcebergSnapshotSummary.APPEND, this.pathFactory.toManifestListPath(manifestListFileName).toString(), schemaId);
        String tableUuid = UUID.randomUUID().toString();
        IcebergMetadata metadata = new IcebergMetadata(tableUuid, this.table.location().toString(), snapshotId, this.table.schema().highestFieldId(), Collections.singletonList(new IcebergSchema(this.table.schema())), schemaId, Collections.singletonList(new IcebergPartitionSpec(partitionFields)), partitionFields.stream().mapToInt(IcebergPartitionField::fieldId).max().orElse(999), Collections.singletonList(snapshot), (int)snapshotId);
        this.table.fileIO().tryToWriteAtomic(this.pathFactory.toMetadataPath(snapshotId), metadata.toJson());
        this.table.fileIO().overwriteFileUtf8(new Path(this.pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), String.valueOf(snapshotId));
        this.expireAllBefore(snapshotId);
    }

    private List<IcebergManifestEntry> dataSplitToManifestEntries(DataSplit dataSplit, long snapshotId, SchemaCache schemas) {
        ArrayList<IcebergManifestEntry> result = new ArrayList<IcebergManifestEntry>();
        List<RawFile> rawFiles = dataSplit.convertToRawFiles().get();
        for (int i = 0; i < dataSplit.dataFiles().size(); ++i) {
            DataFileMeta paimonFileMeta = dataSplit.dataFiles().get(i);
            RawFile rawFile = rawFiles.get(i);
            IcebergDataFileMeta fileMeta = IcebergDataFileMeta.create(IcebergDataFileMeta.Content.DATA, rawFile.path(), rawFile.format(), dataSplit.partition(), rawFile.rowCount(), rawFile.fileSize(), schemas.get(paimonFileMeta.schemaId()), paimonFileMeta.valueStats());
            result.add(new IcebergManifestEntry(IcebergManifestEntry.Status.ADDED, snapshotId, snapshotId, snapshotId, fileMeta));
        }
        return result;
    }

    private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
        ArrayList<IcebergPartitionField> result = new ArrayList<IcebergPartitionField>();
        int fieldId = 1000;
        for (DataField field : partitionType.getFields()) {
            result.add(new IcebergPartitionField(field, fieldId));
            ++fieldId;
        }
        return result;
    }

    private void createMetadataWithBase(FileChangesCollector fileChangesCollector, long snapshotId, Path baseMetadataPath) throws IOException {
        IcebergSnapshotSummary snapshotSummary;
        ArrayList<IcebergManifestFileMeta> newManifestFileMetas;
        IcebergMetadata baseMetadata = IcebergMetadata.fromPath(this.table.fileIO(), baseMetadataPath);
        List<IcebergManifestFileMeta> baseManifestFileMetas = this.manifestList.read(baseMetadata.currentSnapshot().manifestList());
        LinkedHashMap<String, BinaryRow> removedFiles = new LinkedHashMap<String, BinaryRow>();
        LinkedHashMap<String, Pair<BinaryRow, DataFileMeta>> addedFiles = new LinkedHashMap<String, Pair<BinaryRow, DataFileMeta>>();
        boolean isAddOnly = fileChangesCollector.collect(removedFiles, addedFiles);
        LinkedHashSet modifiedPartitionsSet = new LinkedHashSet(removedFiles.values());
        modifiedPartitionsSet.addAll(addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList()));
        ArrayList<BinaryRow> modifiedPartitions = new ArrayList<BinaryRow>(modifiedPartitionsSet);
        if (isAddOnly) {
            newManifestFileMetas = new ArrayList(baseManifestFileMetas);
            newManifestFileMetas.addAll(this.createNewlyAddedManifestFileMetas(addedFiles, snapshotId));
            snapshotSummary = IcebergSnapshotSummary.APPEND;
        } else {
            Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> result = this.createWithDeleteManifestFileMetas(removedFiles, addedFiles, modifiedPartitions, baseManifestFileMetas, snapshotId);
            newManifestFileMetas = result.getLeft();
            snapshotSummary = result.getRight();
        }
        String manifestListFileName = this.manifestList.writeWithoutRolling(this.compactMetadataIfNeeded(newManifestFileMetas, snapshotId));
        int schemaId = (int)this.table.schema().id();
        List<IcebergSchema> schemas = baseMetadata.schemas();
        if (baseMetadata.currentSchemaId() != schemaId) {
            schemas = new ArrayList<IcebergSchema>(schemas);
            schemas.add(new IcebergSchema(this.table.schema()));
        }
        List<IcebergSnapshot> snapshots = new ArrayList<IcebergSnapshot>(baseMetadata.snapshots());
        snapshots.add(new IcebergSnapshot(snapshotId, snapshotId, System.currentTimeMillis(), snapshotSummary, this.pathFactory.toManifestListPath(manifestListFileName).toString(), schemaId));
        ArrayList toExpireExceptLast = new ArrayList();
        int i = 0;
        while (i + 1 < snapshots.size()) {
            toExpireExceptLast.add(snapshots.get(i));
            if (!this.shouldExpire((IcebergSnapshot)snapshots.get(i), snapshotId)) {
                snapshots = snapshots.subList(i, snapshots.size());
                break;
            }
            ++i;
        }
        IcebergMetadata metadata = new IcebergMetadata(baseMetadata.tableUuid(), baseMetadata.location(), snapshotId, this.table.schema().highestFieldId(), schemas, schemaId, baseMetadata.partitionSpecs(), baseMetadata.lastPartitionId(), snapshots, (int)snapshotId);
        this.table.fileIO().tryToWriteAtomic(this.pathFactory.toMetadataPath(snapshotId), metadata.toJson());
        this.table.fileIO().overwriteFileUtf8(new Path(this.pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), String.valueOf(snapshotId));
        this.table.fileIO().deleteQuietly(baseMetadataPath);
        int i2 = 0;
        while (i2 + 1 < toExpireExceptLast.size()) {
            this.expireManifestList(new Path(((IcebergSnapshot)toExpireExceptLast.get(i2)).manifestList()).getName(), new Path(((IcebergSnapshot)toExpireExceptLast.get(i2 + 1)).manifestList()).getName());
            ++i2;
        }
    }

    private boolean collectFileChanges(List<ManifestEntry> manifestEntries, Map<String, BinaryRow> removedFiles, Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
        boolean isAddOnly = true;
        block4: for (ManifestEntry entry : manifestEntries) {
            String path = this.fileStorePathFactory.bucketPath(entry.partition(), entry.bucket()) + "/" + entry.fileName();
            switch (entry.kind()) {
                case ADD: {
                    if (!this.shouldAddFileToIceberg(entry.file())) continue block4;
                    removedFiles.remove(path);
                    addedFiles.put(path, Pair.of(entry.partition(), entry.file()));
                    continue block4;
                }
                case DELETE: {
                    isAddOnly = false;
                    addedFiles.remove(path);
                    removedFiles.put(path, entry.partition());
                    continue block4;
                }
            }
            throw new UnsupportedOperationException("Unknown ManifestEntry FileKind " + (Object)((Object)entry.kind()));
        }
        return isAddOnly;
    }

    private boolean collectFileChanges(long snapshotId, Map<String, BinaryRow> removedFiles, Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
        return this.collectFileChanges(this.table.store().newScan().withKind(ScanMode.DELTA).withSnapshot(snapshotId).plan().files(), removedFiles, addedFiles);
    }

    protected abstract boolean shouldAddFileToIceberg(DataFileMeta var1);

    private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles, long currentSnapshotId) throws IOException {
        if (addedFiles.isEmpty()) {
            return Collections.emptyList();
        }
        SchemaCache schemas = new SchemaCache();
        return this.manifestFile.rollingWrite(addedFiles.entrySet().stream().map(e -> {
            DataFileMeta paimonFileMeta = (DataFileMeta)((Pair)e.getValue()).getRight();
            IcebergDataFileMeta icebergFileMeta = IcebergDataFileMeta.create(IcebergDataFileMeta.Content.DATA, (String)e.getKey(), paimonFileMeta.fileFormat(), (BinaryRow)((Pair)e.getValue()).getLeft(), paimonFileMeta.rowCount(), paimonFileMeta.fileSize(), schemas.get(paimonFileMeta.schemaId()), paimonFileMeta.valueStats());
            return new IcebergManifestEntry(IcebergManifestEntry.Status.ADDED, currentSnapshotId, currentSnapshotId, currentSnapshotId, icebergFileMeta);
        }).iterator(), currentSnapshotId);
    }

    private Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> createWithDeleteManifestFileMetas(Map<String, BinaryRow> removedFiles, Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles, List<BinaryRow> modifiedPartitions, List<IcebergManifestFileMeta> baseManifestFileMetas, long currentSnapshotId) throws IOException {
        IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND;
        ArrayList<IcebergManifestFileMeta> newManifestFileMetas = new ArrayList<IcebergManifestFileMeta>();
        RowType partitionType = this.table.schema().logicalPartitionType();
        PartitionPredicate predicate = PartitionPredicate.fromMultiple(partitionType, modifiedPartitions);
        for (IcebergManifestFileMeta fileMeta : baseManifestFileMetas) {
            int numFields = partitionType.getFieldCount();
            GenericRow minValues = new GenericRow(numFields);
            GenericRow maxValues = new GenericRow(numFields);
            long[] nullCounts = new long[numFields];
            for (int i = 0; i < numFields; ++i) {
                IcebergPartitionSummary summary = fileMeta.partitions().get(i);
                DataType fieldType = partitionType.getTypeAt(i);
                minValues.setField(i, IcebergConversions.toPaimonObject(fieldType, summary.lowerBound()));
                maxValues.setField(i, IcebergConversions.toPaimonObject(fieldType, summary.upperBound()));
                nullCounts[i] = summary.containsNull() ? 1L : 0L;
            }
            if (predicate == null || predicate.test(fileMeta.liveRowsCount(), minValues, maxValues, new GenericArray(nullCounts))) {
                List entries = this.manifestFile.read(new Path(fileMeta.manifestPath()).getName());
                boolean canReuseFile = true;
                for (IcebergManifestEntry entry : entries) {
                    if (!entry.isLive()) continue;
                    String path = entry.file().filePath();
                    if (addedFiles.containsKey(path)) {
                        addedFiles.remove(path);
                        continue;
                    }
                    if (!removedFiles.containsKey(path)) continue;
                    canReuseFile = false;
                }
                if (canReuseFile) {
                    newManifestFileMetas.add(fileMeta);
                    continue;
                }
                snapshotSummary = IcebergSnapshotSummary.OVERWRITE;
                ArrayList<IcebergManifestEntry> newEntries = new ArrayList<IcebergManifestEntry>();
                for (IcebergManifestEntry entry : entries) {
                    if (!entry.isLive()) continue;
                    newEntries.add(new IcebergManifestEntry(removedFiles.containsKey(entry.file().filePath()) ? IcebergManifestEntry.Status.DELETED : IcebergManifestEntry.Status.EXISTING, entry.snapshotId(), entry.sequenceNumber(), entry.fileSequenceNumber(), entry.file()));
                }
                newManifestFileMetas.addAll(this.manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
                continue;
            }
            newManifestFileMetas.add(fileMeta);
        }
        newManifestFileMetas.addAll(this.createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId));
        return Pair.of(newManifestFileMetas, snapshotSummary);
    }

    private List<IcebergManifestFileMeta> compactMetadataIfNeeded(List<IcebergManifestFileMeta> toCompact, long currentSnapshotId) throws IOException {
        ArrayList<IcebergManifestFileMeta> result = new ArrayList<IcebergManifestFileMeta>();
        long targetSizeInBytes = this.table.coreOptions().manifestTargetSize().getBytes();
        ArrayList<IcebergManifestFileMeta> candidates = new ArrayList<IcebergManifestFileMeta>();
        long totalSizeInBytes = 0L;
        for (IcebergManifestFileMeta meta2 : toCompact) {
            if (meta2.manifestLength() < targetSizeInBytes * 2L / 3L) {
                candidates.add(meta2);
                totalSizeInBytes += meta2.manifestLength();
                continue;
            }
            result.add(meta2);
        }
        Options options = new Options(this.table.options());
        if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
            return toCompact;
        }
        if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM) && totalSizeInBytes < targetSizeInBytes) {
            return toCompact;
        }
        Function<IcebergManifestFileMeta, List> processor = meta -> {
            ArrayList<IcebergManifestEntry> entries = new ArrayList<IcebergManifestEntry>();
            for (IcebergManifestEntry entry : IcebergManifestFile.create(this.table, this.pathFactory).read(new Path(meta.manifestPath()).getName())) {
                if (entry.fileSequenceNumber() == currentSnapshotId || entry.status() == IcebergManifestEntry.Status.EXISTING) {
                    entries.add(entry);
                    continue;
                }
                if (entry.status() != IcebergManifestEntry.Status.ADDED) {
                    if (entry.status() == IcebergManifestEntry.Status.DELETED) continue;
                    throw new UnsupportedOperationException("Unknown IcebergManifestEntry.Status " + (Object)((Object)entry.status()));
                }
                IcebergManifestEntry.Status newStatus = IcebergManifestEntry.Status.EXISTING;
                entries.add(new IcebergManifestEntry(newStatus, entry.snapshotId(), entry.sequenceNumber(), entry.fileSequenceNumber(), entry.file()));
            }
            if (meta.sequenceNumber() == currentSnapshotId) {
                this.table.fileIO().deleteQuietly(new Path(meta.manifestPath()));
            }
            return entries;
        };
        Iterable newEntries = ManifestReadThreadPool.sequentialBatchedExecute(processor, candidates, null);
        result.addAll(this.manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
        return result;
    }

    private boolean shouldExpire(IcebergSnapshot snapshot, long currentSnapshotId) {
        Options options = new Options(this.table.options());
        if (snapshot.snapshotId() > currentSnapshotId - (long)options.get(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN).intValue()) {
            return false;
        }
        if (snapshot.snapshotId() <= currentSnapshotId - (long)options.get(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX).intValue()) {
            return true;
        }
        return snapshot.timestampMs() < System.currentTimeMillis() - options.get(CoreOptions.SNAPSHOT_TIME_RETAINED).toMillis();
    }

    private void expireManifestList(String toExpire, String next) {
        HashSet metaInUse = new HashSet(this.manifestList.read(next));
        for (IcebergManifestFileMeta meta : this.manifestList.read(toExpire)) {
            if (metaInUse.contains(meta)) continue;
            this.table.fileIO().deleteQuietly(new Path(meta.manifestPath()));
        }
        this.table.fileIO().deleteQuietly(this.pathFactory.toManifestListPath(toExpire));
    }

    private void expireAllBefore(long snapshotId) throws IOException {
        HashSet<String> expiredManifestLists = new HashSet<String>();
        HashSet<String> expiredManifestFileMetas = new HashSet<String>();
        Iterator it = this.pathFactory.getAllMetadataPathBefore(this.table.fileIO(), snapshotId).iterator();
        while (it.hasNext()) {
            Path path = (Path)it.next();
            IcebergMetadata metadata = IcebergMetadata.fromPath(this.table.fileIO(), path);
            for (IcebergSnapshot snapshot : metadata.snapshots()) {
                Path listPath = new Path(snapshot.manifestList());
                String listName = listPath.getName();
                if (expiredManifestLists.contains(listName)) continue;
                expiredManifestLists.add(listName);
                for (IcebergManifestFileMeta meta : this.manifestList.read(listName)) {
                    String metaName = new Path(meta.manifestPath()).getName();
                    if (expiredManifestFileMetas.contains(metaName)) continue;
                    expiredManifestFileMetas.add(metaName);
                    this.table.fileIO().deleteQuietly(new Path(meta.manifestPath()));
                }
                this.table.fileIO().deleteQuietly(listPath);
            }
            this.table.fileIO().deleteQuietly(path);
        }
    }

    private class SchemaCache {
        SchemaManager schemaManager;
        Map<Long, TableSchema> tableSchemas;

        private SchemaCache() {
            this.schemaManager = new SchemaManager(AbstractIcebergCommitCallback.this.table.fileIO(), AbstractIcebergCommitCallback.this.table.location());
            this.tableSchemas = new HashMap<Long, TableSchema>();
        }

        private TableSchema get(long schemaId) {
            return this.tableSchemas.computeIfAbsent(schemaId, id -> this.schemaManager.schema((long)id));
        }
    }

    private static interface FileChangesCollector {
        public boolean collect(Map<String, BinaryRow> var1, Map<String, Pair<BinaryRow, DataFileMeta>> var2) throws IOException;
    }
}

