/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableCommitImpl
implements InnerTableCommit {
    private static final Logger LOG = LoggerFactory.getLogger(TableCommitImpl.class);
    private final FileStoreCommit commit;
    @Nullable
    private final Runnable expireSnapshots;
    @Nullable
    private final PartitionExpire partitionExpire;
    @Nullable
    private final TagAutoManager tagAutoManager;
    private final Lock lock;
    @Nullable
    private final Duration consumerExpireTime;
    private final ConsumerManager consumerManager;
    private final ExecutorService expireMainExecutor;
    private final AtomicReference<Throwable> expireError;
    private final String tableName;
    @Nullable
    private Map<String, String> overwritePartition = null;
    private boolean batchCommitted = false;
    private final boolean forceCreatingSnapshot;

    public TableCommitImpl(FileStoreCommit commit, @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoManager tagAutoManager, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, CoreOptions.ExpireExecutionMode expireExecutionMode, String tableName, boolean forceCreatingSnapshot) {
        commit.withLock(lock);
        if (partitionExpire != null) {
            partitionExpire.withLock(lock);
            commit.withPartitionExpire(partitionExpire);
        }
        this.commit = commit;
        this.expireSnapshots = expireSnapshots;
        this.partitionExpire = partitionExpire;
        this.tagAutoManager = tagAutoManager;
        this.lock = lock;
        this.consumerExpireTime = consumerExpireTime;
        this.consumerManager = consumerManager;
        this.expireMainExecutor = expireExecutionMode == CoreOptions.ExpireExecutionMode.SYNC ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "expire-main-thread"));
        this.expireError = new AtomicReference<Object>(null);
        this.tableName = tableName;
        this.forceCreatingSnapshot = forceCreatingSnapshot;
    }

    public boolean forceCreatingSnapshot() {
        if (this.forceCreatingSnapshot) {
            return true;
        }
        if (this.overwritePartition != null) {
            return true;
        }
        return this.tagAutoManager != null && this.tagAutoManager.getTagAutoCreation() != null && this.tagAutoManager.getTagAutoCreation().forceCreatingSnapshot();
    }

    @Override
    public TableCommitImpl withOverwrite(@Nullable Map<String, String> overwritePartitions) {
        this.overwritePartition = overwritePartitions;
        return this;
    }

    @Override
    public TableCommitImpl ignoreEmptyCommit(boolean ignoreEmptyCommit) {
        this.commit.ignoreEmptyCommit(ignoreEmptyCommit);
        return this;
    }

    @Override
    public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
        this.commit.withMetrics(new CommitMetrics(registry, this.tableName));
        return this;
    }

    @Override
    public void commit(List<CommitMessage> commitMessages) {
        this.checkCommitted();
        this.commit(Long.MAX_VALUE, commitMessages);
    }

    @Override
    public void truncateTable() {
        this.checkCommitted();
        this.commit.truncateTable(Long.MAX_VALUE);
    }

    private void checkCommitted() {
        Preconditions.checkState(!this.batchCommitted, "BatchTableCommit only support one-time committing.");
        this.batchCommitted = true;
    }

    @Override
    public void commit(long identifier, List<CommitMessage> commitMessages) {
        this.commit(this.createManifestCommittable(identifier, commitMessages));
    }

    @Override
    public int filterAndCommit(Map<Long, List<CommitMessage>> commitIdentifiersAndMessages) {
        return this.filterAndCommitMultiple(commitIdentifiersAndMessages.entrySet().stream().map(e -> this.createManifestCommittable((Long)e.getKey(), (List)e.getValue())).collect(Collectors.toList()));
    }

    private ManifestCommittable createManifestCommittable(long identifier, List<CommitMessage> commitMessages) {
        ManifestCommittable committable = new ManifestCommittable(identifier);
        for (CommitMessage commitMessage : commitMessages) {
            committable.addFileCommittable(commitMessage);
        }
        return committable;
    }

    public void commit(ManifestCommittable committable) {
        this.commitMultiple(Collections.singletonList(committable), false);
    }

    public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        if (this.overwritePartition == null) {
            for (ManifestCommittable committable : committables) {
                this.commit.commit(committable, new HashMap<String, String>(), checkAppendFiles);
            }
            if (!committables.isEmpty()) {
                this.expire(committables.get(committables.size() - 1).identifier(), this.expireMainExecutor);
            }
        } else {
            if (committables.size() > 1) {
                throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: " + committables);
            }
            ManifestCommittable committable = committables.size() == 1 ? committables.get(0) : new ManifestCommittable(Long.MAX_VALUE);
            this.commit.overwrite(this.overwritePartition, committable, Collections.emptyMap());
            this.expire(committable.identifier(), this.expireMainExecutor);
        }
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
        return this.filterAndCommitMultiple(committables, true);
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
        List<ManifestCommittable> sortedCommittables = committables.stream().sorted(Comparator.comparingLong(ManifestCommittable::identifier)).collect(Collectors.toList());
        List<ManifestCommittable> retryCommittables = this.commit.filterCommitted(sortedCommittables);
        if (!retryCommittables.isEmpty()) {
            this.checkFilesExistence(retryCommittables);
            this.commitMultiple(retryCommittables, checkAppendFiles);
        }
        return retryCommittables.size();
    }

    private void checkFilesExistence(List<ManifestCommittable> committables) {
        ArrayList files = new ArrayList();
        HashMap<Pair, DataFilePathFactory> factoryMap = new HashMap<Pair, DataFilePathFactory>();
        PathFactory indexFileFactory = this.commit.pathFactory().indexFileFactory();
        for (ManifestCommittable committable : committables) {
            for (CommitMessage message : committable.fileCommittables()) {
                CommitMessageImpl msg = (CommitMessageImpl)message;
                DataFilePathFactory pathFactory = factoryMap.computeIfAbsent(Pair.of(message.partition(), message.bucket()), k -> this.commit.pathFactory().createDataFilePathFactory((BinaryRow)k.getKey(), (Integer)k.getValue()));
                Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory));
                msg.newFilesIncrement().newFiles().forEach(collector);
                msg.newFilesIncrement().changelogFiles().forEach(collector);
                msg.compactIncrement().compactBefore().forEach(collector);
                msg.compactIncrement().compactAfter().forEach(collector);
                msg.indexIncrement().newIndexFiles().stream().map(IndexFileMeta::fileName).map(indexFileFactory::toPath).forEach(files::add);
                msg.indexIncrement().deletedIndexFiles().stream().map(IndexFileMeta::fileName).map(indexFileFactory::toPath).forEach(files::add);
            }
        }
        Predicate<Path> nonExists = p -> {
            try {
                return !this.commit.fileIO().exists((Path)p);
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
        ArrayList nonExistFiles = Lists.newArrayList(ThreadPoolUtils.randomlyExecuteSequentialReturn(ManifestReadThreadPool.getExecutorService(null), f -> nonExists.test((Path)f) ? Collections.singletonList(f) : Collections.emptyList(), files));
        if (nonExistFiles.size() > 0) {
            String message = String.join((CharSequence)"\n", "Cannot recover from this checkpoint because some files in the snapshot that need to be resubmitted have been deleted:", "    " + nonExistFiles.stream().map(Object::toString).collect(Collectors.joining(",")), "    The most likely reason is because you are recovering from a very old savepoint that contains some uncommitted files that have already been deleted.");
            throw new RuntimeException(message);
        }
    }

    private void expire(long partitionExpireIdentifier, ExecutorService executor) {
        if (this.expireError.get() != null) {
            throw new RuntimeException(this.expireError.get());
        }
        executor.execute(() -> {
            try {
                this.expire(partitionExpireIdentifier);
            }
            catch (Throwable t) {
                LOG.error("Executing expire encountered an error.", t);
                this.expireError.compareAndSet(null, t);
            }
        });
    }

    private void expire(long partitionExpireIdentifier) {
        if (this.consumerExpireTime != null) {
            this.consumerManager.expire(LocalDateTime.now().minus(this.consumerExpireTime));
        }
        this.expireSnapshots();
        if (this.partitionExpire != null) {
            this.partitionExpire.expire(partitionExpireIdentifier);
        }
        if (this.tagAutoManager != null) {
            this.tagAutoManager.run();
        }
    }

    public void expireSnapshots() {
        if (this.expireSnapshots != null) {
            this.expireSnapshots.run();
        }
    }

    @Override
    public void close() throws Exception {
        this.commit.close();
        IOUtils.closeQuietly(this.lock);
        this.expireMainExecutor.shutdownNow();
    }

    @Override
    public void abort(List<CommitMessage> commitMessages) {
        this.commit.abort(commitMessages);
    }

    @VisibleForTesting
    public ExecutorService getExpireMainExecutor() {
        return this.expireMainExecutor;
    }
}

