/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.source.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.AbstractStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalStartingScanner
extends AbstractStartingScanner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalStartingScanner.class);
    private final long endingSnapshotId;
    private final ScanMode scanMode;

    public IncrementalStartingScanner(SnapshotManager snapshotManager, long start, long end, ScanMode scanMode) {
        super(snapshotManager);
        this.startingSnapshotId = start;
        this.endingSnapshotId = end;
        this.scanMode = scanMode;
    }

    @Override
    public StartingScanner.Result scan(SnapshotReader reader) {
        Optional<StartingScanner.Result> checkResult = this.checkScanSnapshotIdValidity();
        if (checkResult.isPresent()) {
            return checkResult.get();
        }
        ConcurrentHashMap<Pair, List> grouped = new ConcurrentHashMap<Pair, List>();
        ManifestsReader manifestsReader = reader.manifestsReader();
        List snapshots2 = LongStream.range(this.startingSnapshotId + 1L, this.endingSnapshotId + 1L).boxed().collect(Collectors.toList());
        Iterator manifests = ManifestReadThreadPool.randomlyExecuteSequentialReturn(id -> {
            Snapshot snapshot = this.snapshotManager.snapshot((long)id);
            switch (this.scanMode) {
                case DELTA: {
                    if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) break;
                    return Collections.emptyList();
                }
                case CHANGELOG: {
                    if (snapshot.commitKind() != Snapshot.CommitKind.OVERWRITE) break;
                    return Collections.emptyList();
                }
                default: {
                    throw new UnsupportedOperationException("Unsupported scan mode: " + (Object)((Object)this.scanMode));
                }
            }
            return manifestsReader.read((Snapshot)snapshot, (ScanMode)this.scanMode).filteredManifests;
        }, snapshots2, reader.parallelism());
        Iterator entries = ManifestReadThreadPool.randomlyExecuteSequentialReturn(reader::readManifest, Lists.newArrayList(manifests), reader.parallelism());
        while (entries.hasNext()) {
            ManifestEntry entry = (ManifestEntry)entries.next();
            Preconditions.checkArgument(entry.kind() == FileKind.ADD, "Delta or changelog should only have ADD files.");
            grouped.compute(Pair.of(entry.partition(), entry.bucket()), (key, files) -> {
                if (files == null) {
                    files = new ArrayList<DataFileMeta>();
                }
                files.add(entry.file());
                return files;
            });
        }
        ArrayList<Split> result = new ArrayList<Split>();
        for (Map.Entry entry : grouped.entrySet()) {
            BinaryRow partition = (BinaryRow)((Pair)entry.getKey()).getLeft();
            int bucket = (Integer)((Pair)entry.getKey()).getRight();
            String bucketPath = reader.pathFactory().bucketPath(partition, bucket).toString();
            for (SplitGenerator.SplitGroup splitGroup : reader.splitGenerator().splitForBatch((List)entry.getValue())) {
                DataSplit.Builder dataSplitBuilder = DataSplit.builder().withSnapshot(this.endingSnapshotId).withPartition(partition).withBucket(bucket).withDataFiles(splitGroup.files).rawConvertible(splitGroup.rawConvertible).withBucketPath(bucketPath);
                result.add(dataSplitBuilder.build());
            }
        }
        return StartingScanner.fromPlan(new PlanImpl(null, this.endingSnapshotId, result));
    }

    public Optional<StartingScanner.Result> checkScanSnapshotIdValidity() {
        Long earliestSnapshotId = this.snapshotManager.earliestSnapshotId();
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        if (earliestSnapshotId == null || latestSnapshotId == null) {
            LOG.warn("There is currently no snapshot. Waiting for snapshot generation.");
            return Optional.of(new StartingScanner.NoSnapshot());
        }
        Preconditions.checkArgument(this.startingSnapshotId <= this.endingSnapshotId, "Starting snapshotId %s must less than ending snapshotId %s.", this.startingSnapshotId, this.endingSnapshotId);
        Preconditions.checkArgument(this.startingSnapshotId >= earliestSnapshotId - 1L && this.endingSnapshotId <= latestSnapshotId, "The specified scan snapshotId range [%s, %s] is out of available snapshotId range [%s, %s].", this.startingSnapshotId, this.endingSnapshotId, earliestSnapshotId, latestSnapshotId);
        return Optional.empty();
    }
}

