/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.operation;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrphanFilesClean {
    private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class);
    private static final ThreadPoolExecutor EXECUTOR = ThreadPoolUtils.createCachedThreadPool(Runtime.getRuntime().availableProcessors(), "ORPHAN_FILES_CLEAN");
    private static final int READ_FILE_RETRY_NUM = 3;
    private static final int READ_FILE_RETRY_INTERVAL = 5;
    private static final int SHOW_LIMIT = 200;
    private final FileStoreTable table;
    private final FileIO fileIO;
    private final Path location;
    private final int partitionKeysNum;
    private final List<Path> deleteFiles;
    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1L);
    private Consumer<Path> fileCleaner;

    public OrphanFilesClean(FileStoreTable table) {
        this.table = table;
        this.fileIO = table.fileIO();
        this.location = table.location();
        this.partitionKeysNum = table.partitionKeys().size();
        this.deleteFiles = new ArrayList<Path>();
        this.fileCleaner = path -> {
            try {
                if (this.fileIO.isDir((Path)path)) {
                    this.fileIO.deleteDirectoryQuietly((Path)path);
                } else {
                    this.fileIO.deleteQuietly((Path)path);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        };
    }

    public OrphanFilesClean olderThan(String timestamp) {
        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3, TimeZone.getDefault()).getMillisecond();
        return this;
    }

    public OrphanFilesClean fileCleaner(Consumer<Path> fileCleaner) {
        this.fileCleaner = fileCleaner;
        return this;
    }

    public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
        List<String> branches = this.table.branchManager().branches();
        branches.add("main");
        ArrayList<String> abnormalBranches = new ArrayList<String>();
        for (String branch : branches) {
            if (new SchemaManager(this.table.fileIO(), this.table.location(), branch).latest().isPresent()) continue;
            abnormalBranches.add(branch);
        }
        if (!abnormalBranches.isEmpty()) {
            LOG.warn("Branches {} have no schemas. Orphan files cleaning aborted. Please check these branches manually.", abnormalBranches);
            return Collections.emptyList();
        }
        Map<String, Path> candidates = this.getCandidateDeletingFiles();
        HashSet<String> usedFiles = new HashSet<String>();
        for (String branch : branches) {
            FileStoreTable branchTable = this.table.switchToBranch(branch);
            SnapshotManager snapshotManager = branchTable.snapshotManager();
            List<Path> nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
            nonSnapshotFiles.forEach(this.fileCleaner);
            this.deleteFiles.addAll(nonSnapshotFiles);
            List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
            nonChangelogFiles.forEach(this.fileCleaner);
            this.deleteFiles.addAll(nonChangelogFiles);
            usedFiles.addAll(this.getUsedFiles(branchTable));
        }
        HashSet<String> deleted = new HashSet<String>(candidates.keySet());
        deleted.removeAll(usedFiles);
        deleted.stream().map(candidates::get).forEach(this.fileCleaner);
        this.deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));
        return this.deleteFiles;
    }

    private Set<String> getUsedFiles(FileStoreTable branchTable) throws IOException {
        SnapshotManager snapshotManager = branchTable.snapshotManager();
        TagManager tagManager = branchTable.tagManager();
        HashSet<Snapshot> readSnapshots = new HashSet<Snapshot>(snapshotManager.safelyGetAllSnapshots());
        List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
        readSnapshots.addAll(taggedSnapshots);
        readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
        return Sets.newHashSet(ThreadPoolUtils.randomlyExecute(EXECUTOR, snapshot -> this.getUsedFiles(branchTable, (Snapshot)snapshot), readSnapshots));
    }

    private List<String> getUsedFiles(FileStoreTable branchTable, Snapshot snapshot) {
        ManifestList manifestList = branchTable.store().manifestListFactory().create();
        ManifestFile manifestFile = branchTable.store().manifestFileFactory().create();
        if (snapshot instanceof Changelog) {
            return this.getUsedFilesForChangelog(manifestList, manifestFile, (Changelog)snapshot);
        }
        return this.getUsedFilesForSnapshot(manifestList, manifestFile, branchTable.store().newIndexFileHandler(), snapshot);
    }

    private Map<String, Path> getCandidateDeletingFiles() {
        List<Path> fileDirs = this.listPaimonFileDirs();
        Function<Path, List> processor = path -> this.tryBestListingDirs((Path)path).stream().filter(this::oldEnough).map(FileStatus::getPath).collect(Collectors.toList());
        Iterator allPaths = ThreadPoolUtils.randomlyExecute(EXECUTOR, processor, fileDirs);
        HashMap<String, Path> result = new HashMap<String, Path>();
        while (allPaths.hasNext()) {
            Path next = (Path)allPaths.next();
            result.put(next.getName(), next);
        }
        return result;
    }

    private List<String> getUsedFilesForChangelog(ManifestList manifestList, ManifestFile manifestFile, Changelog changelog) {
        ArrayList<String> files = new ArrayList<String>();
        ArrayList manifestFileMetas = new ArrayList();
        try {
            List changelogManifest = new ArrayList();
            if (changelog.changelogManifestList() != null) {
                files.add(changelog.changelogManifestList());
                changelogManifest = this.retryReadingFiles(() -> manifestList.readWithIOException(changelog.changelogManifestList()));
                if (changelogManifest != null) {
                    manifestFileMetas.addAll(changelogManifest);
                }
            }
            if (manifestList.exists(changelog.baseManifestList())) {
                files.add(changelog.baseManifestList());
                List baseManifest = this.retryReadingFiles(() -> manifestList.readWithIOException(changelog.baseManifestList()));
                if (baseManifest != null) {
                    manifestFileMetas.addAll(baseManifest);
                }
            }
            List deltaManifest = null;
            if (manifestList.exists(changelog.deltaManifestList())) {
                files.add(changelog.deltaManifestList());
                deltaManifest = this.retryReadingFiles(() -> manifestList.readWithIOException(changelog.deltaManifestList()));
                if (deltaManifest != null) {
                    manifestFileMetas.addAll(deltaManifest);
                }
            }
            files.addAll(manifestFileMetas.stream().map(ManifestFileMeta::fileName).collect(Collectors.toList()));
            ArrayList<String> manifestFileName = new ArrayList<String>();
            if (changelog.changelogManifestList() != null) {
                manifestFileName.addAll(changelogManifest == null ? new ArrayList() : (Collection)changelogManifest.stream().map(ManifestFileMeta::fileName).collect(Collectors.toList()));
            } else {
                manifestFileName.addAll(deltaManifest == null ? new ArrayList() : (Collection)deltaManifest.stream().map(ManifestFileMeta::fileName).collect(Collectors.toList()));
            }
            List<String> dataFiles = this.retryReadingDataFiles(manifestFile, manifestFileName);
            if (dataFiles == null) {
                return Collections.emptyList();
            }
            files.addAll(dataFiles);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return files;
    }

    private List<String> getUsedFilesForSnapshot(ManifestList manifestList, ManifestFile manifestFile, IndexFileHandler indexFileHandler, Snapshot snapshot) {
        ArrayList<String> files = new ArrayList<String>();
        this.addManifestList(files, snapshot);
        try {
            List manifestFileMetas = this.retryReadingFiles(() -> this.readAllManifestsWithIOException(manifestList, snapshot));
            if (manifestFileMetas == null) {
                return Collections.emptyList();
            }
            List<String> manifestFileName = manifestFileMetas.stream().map(ManifestFileMeta::fileName).collect(Collectors.toList());
            files.addAll(manifestFileName);
            List<String> dataFiles = this.retryReadingDataFiles(manifestFile, manifestFileName);
            if (dataFiles == null) {
                return Collections.emptyList();
            }
            files.addAll(dataFiles);
            String indexManifest = snapshot.indexManifest();
            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
                files.add(indexManifest);
                List indexManifestEntries = this.retryReadingFiles(() -> indexFileHandler.readManifestWithIOException(indexManifest));
                if (indexManifestEntries == null) {
                    return Collections.emptyList();
                }
                indexManifestEntries.stream().map(IndexManifestEntry::indexFile).map(IndexFileMeta::fileName).forEach(files::add);
            }
            if (snapshot.statistics() != null) {
                files.add(snapshot.statistics());
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return files;
    }

    private void addManifestList(List<String> used, Snapshot snapshot) {
        used.add(snapshot.baseManifestList());
        used.add(snapshot.deltaManifestList());
        String changelogManifestList = snapshot.changelogManifestList();
        if (changelogManifestList != null) {
            used.add(changelogManifestList);
        }
    }

    @Nullable
    private <T> T retryReadingFiles(ReaderWithIOException<T> reader) throws IOException {
        int retryNumber = 0;
        IOException caught = null;
        while (retryNumber++ < 3) {
            try {
                return reader.read();
            }
            catch (FileNotFoundException e) {
                return null;
            }
            catch (IOException e) {
                caught = e;
                try {
                    TimeUnit.MILLISECONDS.sleep(5L);
                }
                catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                }
            }
        }
        throw caught;
    }

    private List<ManifestFileMeta> readAllManifestsWithIOException(ManifestList manifestList, Snapshot snapshot) throws IOException {
        ArrayList<ManifestFileMeta> result = new ArrayList<ManifestFileMeta>();
        result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
        result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList()));
        String changelogManifestList = snapshot.changelogManifestList();
        if (changelogManifestList != null) {
            result.addAll(manifestList.readWithIOException(changelogManifestList));
        }
        return result;
    }

    @Nullable
    private List<String> retryReadingDataFiles(ManifestFile manifestFile, List<String> manifestNames) throws IOException {
        ArrayList<String> dataFiles = new ArrayList<String>();
        for (String manifestName : manifestNames) {
            List manifestEntries = this.retryReadingFiles(() -> manifestFile.readWithIOException(manifestName));
            if (manifestEntries == null) {
                return null;
            }
            manifestEntries.stream().map(ManifestEntry::file).forEach(f -> {
                dataFiles.add(f.fileName());
                dataFiles.addAll(f.extraFiles());
            });
        }
        return dataFiles;
    }

    private List<Path> listPaimonFileDirs() {
        ArrayList<Path> paimonFileDirs = new ArrayList<Path>();
        paimonFileDirs.add(new Path(this.location, "manifest"));
        paimonFileDirs.add(new Path(this.location, "index"));
        paimonFileDirs.add(new Path(this.location, "statistics"));
        paimonFileDirs.addAll(this.listAndCleanDataDirs(this.location, this.partitionKeysNum));
        return paimonFileDirs;
    }

    private List<FileStatus> tryBestListingDirs(Path dir) {
        try {
            if (!this.fileIO.exists(dir)) {
                return Collections.emptyList();
            }
            List status = this.retryReadingFiles(() -> {
                FileStatus[] s = this.fileIO.listStatus(dir);
                return s == null ? Collections.emptyList() : Arrays.asList(s);
            });
            return status == null ? Collections.emptyList() : status;
        }
        catch (IOException e) {
            LOG.debug("Failed to list directory {}, skip it.", (Object)dir, (Object)e);
            return Collections.emptyList();
        }
    }

    private boolean oldEnough(FileStatus status) {
        return status.getModificationTime() < this.olderThanMillis;
    }

    private List<Path> listAndCleanDataDirs(Path dir, int level) {
        List<FileStatus> dirs = this.tryBestListingDirs(dir);
        if (level == 0) {
            return this.filterAndCleanDataDirs(dirs, p -> p.getName().startsWith("bucket-"), partitionKeysNum -> partitionKeysNum != 0);
        }
        List<Path> partitionPaths = this.filterAndCleanDataDirs(dirs, p -> p.getName().contains("="), partitionKeysNum -> level != partitionKeysNum);
        return Lists.newArrayList(ThreadPoolUtils.randomlyExecute(EXECUTOR, p -> this.listAndCleanDataDirs((Path)p, level - 1), partitionPaths));
    }

    private List<Path> filterAndCleanDataDirs(List<FileStatus> statuses, Predicate<Path> filter, Predicate<Integer> cleanCondition) {
        ArrayList<Path> filtered = new ArrayList<Path>();
        ArrayList<FileStatus> mayBeClean = new ArrayList<FileStatus>();
        for (FileStatus status : statuses) {
            Path path = status.getPath();
            if (filter.test(path)) {
                filtered.add(path);
                continue;
            }
            mayBeClean.add(status);
        }
        if (cleanCondition.test(this.partitionKeysNum)) {
            mayBeClean.stream().filter(this::oldEnough).map(FileStatus::getPath).forEach(p -> {
                this.fileCleaner.accept((Path)p);
                List<Path> list = this.deleteFiles;
                synchronized (list) {
                    this.deleteFiles.add((Path)p);
                }
            });
        }
        return filtered;
    }

    public static List<String> showDeletedFiles(List<Path> deleteFiles, int showLimit) {
        int showSize = Math.min(deleteFiles.size(), showLimit);
        ArrayList<String> result = new ArrayList<String>();
        if (deleteFiles.size() > showSize) {
            result.add(String.format("Total %s files, only %s lines are displayed.", deleteFiles.size(), showSize));
        }
        for (int i = 0; i < showSize; ++i) {
            result.add(deleteFiles.get(i).toUri().getPath());
        }
        return result;
    }

    public static List<OrphanFilesClean> createOrphanFilesCleans(Catalog catalog, String databaseName, @Nullable String tableName) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
        ArrayList<OrphanFilesClean> orphanFilesCleans = new ArrayList<OrphanFilesClean>();
        List<String> tableNames = Collections.singletonList(tableName);
        if (tableName == null || "*".equals(tableName)) {
            tableNames = catalog.listTables(databaseName);
        }
        for (String t : tableNames) {
            Identifier identifier = new Identifier(databaseName, t);
            Table table = catalog.getTable(identifier);
            Preconditions.checkArgument(table instanceof FileStoreTable, "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", table.getClass().getName());
            orphanFilesCleans.add(new OrphanFilesClean((FileStoreTable)table));
        }
        return orphanFilesCleans;
    }

    public static String[] executeOrphanFilesClean(List<OrphanFilesClean> tableCleans) {
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        ArrayList<Future<List>> tasks = new ArrayList<Future<List>>();
        for (OrphanFilesClean clean : tableCleans) {
            tasks.add(executorService.submit(clean::clean));
        }
        ArrayList<Path> cleanOrphanFiles = new ArrayList<Path>();
        for (Future future : tasks) {
            try {
                cleanOrphanFiles.addAll((Collection)future.get());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        executorService.shutdownNow();
        return OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, 200).toArray(new String[0]);
    }

    @FunctionalInterface
    private static interface ReaderWithIOException<T> {
        public T read() throws IOException;
    }
}

