/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
    private static final String SNAPSHOT_PREFIX = "snapshot-";
    private static final String CHANGELOG_PREFIX = "changelog-";
    public static final String EARLIEST = "EARLIEST";
    public static final String LATEST = "LATEST";
    private static final int READ_HINT_RETRY_NUM = 3;
    private static final int READ_HINT_RETRY_INTERVAL = 1;
    private final FileIO fileIO;
    private final Path tablePath;
    private final String branch;
    @Nullable
    private final Cache<Path, Snapshot> cache;

    public SnapshotManager(FileIO fileIO, Path tablePath) {
        this(fileIO, tablePath, "main");
    }

    public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String branchName) {
        this(fileIO, tablePath, branchName, null);
    }

    public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String branchName, @Nullable Cache<Path, Snapshot> cache) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
        this.branch = BranchManager.normalizeBranch(branchName);
        this.cache = cache;
    }

    public SnapshotManager copyWithBranch(String branchName) {
        return new SnapshotManager(this.fileIO, this.tablePath, branchName);
    }

    public FileIO fileIO() {
        return this.fileIO;
    }

    public Path tablePath() {
        return this.tablePath;
    }

    public String branch() {
        return this.branch;
    }

    public Path changelogDirectory() {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/changelog");
    }

    public Path longLivedChangelogPath(long snapshotId) {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/changelog/" + CHANGELOG_PREFIX + snapshotId);
    }

    public Path snapshotPath(long snapshotId) {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
    }

    public Path snapshotDirectory() {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/snapshot");
    }

    public void invalidateCache() {
        if (this.cache != null) {
            this.cache.invalidateAll();
        }
    }

    public Snapshot snapshot(long snapshotId) {
        Snapshot snapshot;
        Path path = this.snapshotPath(snapshotId);
        Snapshot snapshot2 = snapshot = this.cache == null ? null : this.cache.getIfPresent(path);
        if (snapshot == null) {
            snapshot = Snapshot.fromPath(this.fileIO, path);
            if (this.cache != null) {
                this.cache.put(path, snapshot);
            }
        }
        return snapshot;
    }

    public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException {
        Snapshot snapshot;
        Path path = this.snapshotPath(snapshotId);
        Snapshot snapshot2 = snapshot = this.cache == null ? null : this.cache.getIfPresent(path);
        if (snapshot == null) {
            snapshot = Snapshot.tryFromPath(this.fileIO, path);
            if (this.cache != null) {
                this.cache.put(path, snapshot);
            }
        }
        return snapshot;
    }

    public Changelog changelog(long snapshotId) {
        Path changelogPath = this.longLivedChangelogPath(snapshotId);
        return Changelog.fromPath(this.fileIO, changelogPath);
    }

    public Changelog longLivedChangelog(long snapshotId) {
        return Changelog.fromPath(this.fileIO, this.longLivedChangelogPath(snapshotId));
    }

    public boolean snapshotExists(long snapshotId) {
        Path path = this.snapshotPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if snapshot #" + snapshotId + " exists in path " + path, e);
        }
    }

    public void deleteSnapshot(long snapshotId) {
        Path path = this.snapshotPath(snapshotId);
        if (this.cache != null) {
            this.cache.invalidate(path);
        }
        this.fileIO().deleteQuietly(path);
    }

    public boolean longLivedChangelogExists(long snapshotId) {
        Path path = this.longLivedChangelogPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if changelog #" + snapshotId + " exists in path " + path, e);
        }
    }

    @Nullable
    public Snapshot latestSnapshot() {
        Long snapshotId = this.latestSnapshotId();
        return snapshotId == null ? null : this.snapshot(snapshotId);
    }

    @Nullable
    public Long latestSnapshotId() {
        try {
            return this.findLatest(this.snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest snapshot id", e);
        }
    }

    @Nullable
    public Snapshot earliestSnapshot() {
        Long snapshotId = this.earliestSnapshotId();
        return snapshotId == null ? null : this.snapshot(snapshotId);
    }

    @Nullable
    public Long earliestSnapshotId() {
        try {
            return this.findEarliest(this.snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest snapshot id", e);
        }
    }

    @Nullable
    public Long earliestLongLivedChangelogId() {
        try {
            return this.findEarliest(this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest changelog id", e);
        }
    }

    @Nullable
    public Long latestLongLivedChangelogId() {
        try {
            return this.findLatest(this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest changelog id", e);
        }
    }

    @Nullable
    public Long latestChangelogId() {
        return this.latestSnapshotId();
    }

    @Nullable
    public Long pickOrLatest(Predicate<Snapshot> predicate) {
        Long latestId = this.latestSnapshotId();
        Long earliestId = this.earliestSnapshotId();
        if (latestId == null || earliestId == null) {
            return null;
        }
        for (long snapshotId = latestId.longValue(); snapshotId >= earliestId; --snapshotId) {
            Snapshot snapshot;
            if (!this.snapshotExists(snapshotId) || !predicate.test(snapshot = this.snapshot(snapshotId))) continue;
            return snapshot.id();
        }
        return latestId;
    }

    private Snapshot changelogOrSnapshot(long snapshotId) {
        if (this.longLivedChangelogExists(snapshotId)) {
            return this.changelog(snapshotId);
        }
        return this.snapshot(snapshotId);
    }

    @Nullable
    public Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
        Long earliestChangelog;
        Long earliestSnapshot = this.earliestSnapshotId();
        Long earliest = startFromChangelog ? ((earliestChangelog = this.earliestLongLivedChangelogId()) == null ? earliestSnapshot : earliestChangelog) : earliestSnapshot;
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        if (this.changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
            return earliest - 1L;
        }
        while (earliest < latest) {
            long mid = (earliest + latest + 1L) / 2L;
            if (this.changelogOrSnapshot(mid).timeMillis() < timestampMills) {
                earliest = mid;
                continue;
            }
            latest = mid - 1L;
        }
        return earliest;
    }

    @Nullable
    public Snapshot earlierOrEqualTimeMills(long timestampMills) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        Snapshot earliestSnapShot = this.snapshot(earliest);
        if (earliestSnapShot.timeMillis() > timestampMills) {
            return earliestSnapShot;
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            long commitTime = snapshot.timeMillis();
            if (commitTime > timestampMills) {
                latest = mid - 1L;
                continue;
            }
            if (commitTime < timestampMills) {
                earliest = mid + 1L;
                finalSnapshot = snapshot;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot laterOrEqualTimeMills(long timestampMills) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null) {
            return null;
        }
        Snapshot latestSnapShot = this.snapshot(latest);
        if (latestSnapShot.timeMillis() < timestampMills) {
            return null;
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            long commitTime = snapshot.timeMillis();
            if (commitTime > timestampMills) {
                latest = mid - 1L;
                finalSnapshot = snapshot;
                continue;
            }
            if (commitTime < timestampMills) {
                earliest = mid + 1L;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot earlierOrEqualWatermark(long watermark) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null || this.snapshot(latest).watermark() == Long.MIN_VALUE) {
            return null;
        }
        Long earliestWatermark = null;
        earliestWatermark = this.snapshot(earliest).watermark();
        if (earliestWatermark == null) {
            while (earliest < latest) {
                Long l = earliest;
                Long l2 = earliest = Long.valueOf(earliest + 1L);
                earliestWatermark = this.snapshot(earliest).watermark();
                if (earliestWatermark == null) continue;
            }
        }
        if (earliestWatermark == null) {
            return null;
        }
        if (earliestWatermark >= watermark) {
            return this.snapshot(earliest);
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            Long commitWatermark = snapshot.watermark();
            if (commitWatermark == null) {
                while (mid >= earliest && (commitWatermark = this.snapshot(--mid).watermark()) == null) {
                }
            }
            if (commitWatermark == null) {
                earliest = mid + 1L;
                continue;
            }
            if (commitWatermark > watermark) {
                latest = mid - 1L;
                continue;
            }
            if (commitWatermark < watermark) {
                earliest = mid + 1L;
                finalSnapshot = snapshot;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    @Nullable
    public Snapshot laterOrEqualWatermark(long watermark) {
        Long earliest = this.earliestSnapshotId();
        Long latest = this.latestSnapshotId();
        if (earliest == null || latest == null || this.snapshot(latest).watermark() == Long.MIN_VALUE) {
            return null;
        }
        Long earliestWatermark = null;
        earliestWatermark = this.snapshot(earliest).watermark();
        if (earliestWatermark == null) {
            while (earliest < latest) {
                Long l = earliest;
                Long l2 = earliest = Long.valueOf(earliest + 1L);
                earliestWatermark = this.snapshot(earliest).watermark();
                if (earliestWatermark == null) continue;
            }
        }
        if (earliestWatermark == null) {
            return null;
        }
        if (earliestWatermark >= watermark) {
            return this.snapshot(earliest);
        }
        Snapshot finalSnapshot = null;
        while (earliest <= latest) {
            long mid = earliest + (latest - earliest) / 2L;
            Snapshot snapshot = this.snapshot(mid);
            Long commitWatermark = snapshot.watermark();
            if (commitWatermark == null) {
                while (mid >= earliest && (commitWatermark = this.snapshot(--mid).watermark()) == null) {
                }
            }
            if (commitWatermark == null) {
                earliest = mid + 1L;
                continue;
            }
            if (commitWatermark > watermark) {
                latest = mid - 1L;
                finalSnapshot = snapshot;
                continue;
            }
            if (commitWatermark < watermark) {
                earliest = mid + 1L;
                continue;
            }
            finalSnapshot = snapshot;
            break;
        }
        return finalSnapshot;
    }

    public long snapshotCount() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).count();
    }

    public Iterator<Snapshot> snapshots() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).map(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).filter(predicate).map(this::snapshotPath).collect(Collectors.toList());
    }

    public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
        return snapshotIds.stream().map(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public Iterator<Snapshot> snapshotsWithinRange(Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
        Long lowerBoundSnapshotId = this.earliestSnapshotId();
        Long upperBoundSnapshotId = this.latestSnapshotId();
        if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
            return Collections.emptyIterator();
        }
        if (optionalMaxSnapshotId.isPresent()) {
            Long upperId = optionalMaxSnapshotId.get();
            if (upperId < lowerBoundSnapshotId) {
                throw new RuntimeException(String.format("snapshot upper id:%s should not greater than earliestSnapshotId:%s", upperId, lowerBoundSnapshotId));
            }
            Long l = upperBoundSnapshotId = upperId < upperBoundSnapshotId ? upperId : upperBoundSnapshotId;
        }
        if (optionalMinSnapshotId.isPresent()) {
            Long lowerId = optionalMinSnapshotId.get();
            if (lowerId > upperBoundSnapshotId) {
                throw new RuntimeException(String.format("snapshot upper id:%s should not greater than latestSnapshotId:%s", lowerId, upperBoundSnapshotId));
            }
            lowerBoundSnapshotId = lowerId > lowerBoundSnapshotId ? lowerId : lowerBoundSnapshotId;
        }
        return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1L).mapToObj(this::snapshot).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public Iterator<Changelog> changelogs() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::changelog).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Snapshot> safelyGetAllSnapshots() throws IOException {
        List<Path> paths = FileUtils.listVersionedFiles(this.fileIO, this.snapshotDirectory(), SNAPSHOT_PREFIX).map(this::snapshotPath).collect(Collectors.toList());
        List<Snapshot> snapshots2 = Collections.synchronizedList(new ArrayList(paths.size()));
        this.collectSnapshots(path -> {
            try {
                snapshots2.add(Snapshot.tryFromPath(this.fileIO, path));
            }
            catch (FileNotFoundException fileNotFoundException) {
                // empty catch block
            }
        }, paths);
        return snapshots2;
    }

    public List<Changelog> safelyGetAllChangelogs() throws IOException {
        List<Path> paths = FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::longLivedChangelogPath).collect(Collectors.toList());
        List<Changelog> changelogs = Collections.synchronizedList(new ArrayList(paths.size()));
        this.collectSnapshots(path -> {
            block2: {
                try {
                    changelogs.add(Changelog.fromJson(this.fileIO.readFileUtf8((Path)path)));
                }
                catch (IOException e) {
                    if (e instanceof FileNotFoundException) break block2;
                    throw new RuntimeException(e);
                }
            }
        }, paths);
        return changelogs;
    }

    private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths) throws IOException {
        ThreadPoolExecutor executor = ThreadPoolUtils.createCachedThreadPool(Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR");
        try {
            ThreadPoolUtils.randomlyOnlyExecute(executor, pathConsumer, paths);
        }
        catch (RuntimeException e) {
            throw new IOException(e);
        }
        finally {
            executor.shutdown();
        }
    }

    public List<Pair<Path, Long>> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
        return this.listPathWithFilter(this.snapshotDirectory(), fileStatusFilter, this.nonSnapshotFileFilter());
    }

    public List<Pair<Path, Long>> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
        return this.listPathWithFilter(this.changelogDirectory(), fileStatusFilter, this.nonChangelogFileFilter());
    }

    private List<Pair<Path, Long>> listPathWithFilter(Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
        try {
            FileStatus[] statuses = this.fileIO.listStatus(directory);
            if (statuses == null) {
                return Collections.emptyList();
            }
            return Arrays.stream(statuses).filter(fileStatusFilter).filter(status -> fileFilter.test(status.getPath())).map(status -> Pair.of(status.getPath(), status.getLen())).collect(Collectors.toList());
        }
        catch (IOException ignored) {
            return Collections.emptyList();
        }
    }

    private Predicate<Path> nonSnapshotFileFilter() {
        return path -> {
            String name = path.getName();
            return !name.startsWith(SNAPSHOT_PREFIX) && !name.equals(EARLIEST) && !name.equals(LATEST);
        };
    }

    private Predicate<Path> nonChangelogFileFilter() {
        return path -> {
            String name = path.getName();
            return !name.startsWith(CHANGELOG_PREFIX) && !name.equals(EARLIEST) && !name.equals(LATEST);
        };
    }

    public Optional<Snapshot> latestSnapshotOfUser(String user) {
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return Optional.empty();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot;
            try {
                snapshot = this.snapshot(id);
            }
            catch (Exception e) {
                long newEarliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
                if (id >= newEarliestId) {
                    throw e;
                }
                LOG.warn("Snapshot #" + id + " is expired. The latest snapshot of current user(" + user + ") is not found.");
                break;
            }
            if (!user.equals(snapshot.commitUser())) continue;
            return Optional.of(snapshot);
        }
        return Optional.empty();
    }

    public List<Snapshot> findSnapshotsForIdentifiers(@Nonnull String user, List<Long> identifiers) {
        if (identifiers.isEmpty()) {
            return Collections.emptyList();
        }
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return Collections.emptyList();
        }
        long earliestId = Preconditions.checkNotNull(this.earliestSnapshotId(), "Latest snapshot id is not null, but earliest snapshot id is null. This is unexpected.");
        long minSearchedIdentifier = (Long)identifiers.stream().min(Long::compareTo).get();
        ArrayList<Snapshot> matchedSnapshots = new ArrayList<Snapshot>();
        HashSet<Long> remainingIdentifiers = new HashSet<Long>(identifiers);
        for (long id = latestId.longValue(); id >= earliestId && !remainingIdentifiers.isEmpty(); --id) {
            Snapshot snapshot = this.snapshot(id);
            if (!user.equals(snapshot.commitUser())) continue;
            if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
                matchedSnapshots.add(snapshot);
            }
            if (snapshot.commitIdentifier() <= minSearchedIdentifier) break;
        }
        return matchedSnapshots;
    }

    public void commitChangelog(Changelog changelog, long id) throws IOException {
        this.fileIO.writeFile(this.longLivedChangelogPath(id), changelog.toJson(), true);
    }

    @Nullable
    public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
        Long latestId = this.latestSnapshotId();
        if (latestId == null) {
            return null;
        }
        Long earliestId = this.earliestSnapshotId();
        if (earliestId == null) {
            return null;
        }
        for (long id = latestId.longValue(); id >= earliestId; --id) {
            Snapshot snapshot;
            try {
                snapshot = this.snapshot(id);
            }
            catch (Exception e) {
                Long newEarliestId = this.earliestSnapshotId();
                if (newEarliestId == null) {
                    return null;
                }
                if (id >= newEarliestId) {
                    throw e;
                }
                return null;
            }
            if (!checker.test(snapshot)) continue;
            return snapshot;
        }
        return null;
    }

    @Nullable
    private Long findLatest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        long nextSnapshot;
        Long snapshotId = this.readHint(LATEST, dir);
        if (snapshotId != null && snapshotId > 0L && !this.fileIO.exists(file.apply(nextSnapshot = snapshotId + 1L))) {
            return snapshotId;
        }
        return this.findByListFiles(Math::max, dir, prefix);
    }

    @Nullable
    private Long findEarliest(Path dir, String prefix, Function<Long, Path> file) throws IOException {
        Long snapshotId = this.readHint(EARLIEST, dir);
        if (snapshotId != null && this.fileIO.exists(file.apply(snapshotId))) {
            return snapshotId;
        }
        return this.findByListFiles(Math::min, dir, prefix);
    }

    public Long readHint(String fileName) {
        return this.readHint(fileName, this.snapshotDirectory());
    }

    public Long readHint(String fileName, Path dir) {
        Path path = new Path(dir, fileName);
        int retryNumber = 0;
        while (retryNumber++ < 3) {
            try {
                return this.fileIO.readOverwrittenFileUtf8(path).map(Long::parseLong).orElse(null);
            }
            catch (Exception exception) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        }
        return null;
    }

    private Long findByListFiles(BinaryOperator<Long> reducer2, Path dir, String prefix) throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, dir, prefix).reduce(reducer2).orElse(null);
    }

    public static int findPreviousSnapshot(List<Snapshot> sortedSnapshots, long targetSnapshotId) {
        for (int i = sortedSnapshots.size() - 1; i >= 0; --i) {
            if (sortedSnapshots.get(i).id() >= targetSnapshotId) continue;
            return i;
        }
        return -1;
    }

    public static int findPreviousOrEqualSnapshot(List<Snapshot> sortedSnapshots, long targetSnapshotId) {
        for (int i = sortedSnapshots.size() - 1; i >= 0; --i) {
            if (sortedSnapshots.get(i).id() > targetSnapshotId) continue;
            return i;
        }
        return -1;
    }

    public void deleteLatestHint() throws IOException {
        Path snapshotDir = this.snapshotDirectory();
        Path hintFile = new Path(snapshotDir, LATEST);
        this.fileIO.delete(hintFile, false);
    }

    public void commitLatestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, LATEST, this.snapshotDirectory());
    }

    public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, LATEST, this.changelogDirectory());
    }

    public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, EARLIEST, this.changelogDirectory());
    }

    public void commitEarliestHint(long snapshotId) throws IOException {
        this.commitHint(snapshotId, EARLIEST, this.snapshotDirectory());
    }

    private void commitHint(long snapshotId, String fileName, Path dir) throws IOException {
        Path hintFile = new Path(dir, fileName);
        int loopTime = 3;
        while (loopTime-- > 0) {
            try {
                this.fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
                return;
            }
            catch (IOException e) {
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1000) + 500);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                if (loopTime != 0) continue;
                throw e;
            }
        }
    }
}

