/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.checkpoint.Checkpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class CheckpointsCleaner
implements Serializable,
AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class);
    private static final long serialVersionUID = 2545865801947537790L;
    private final boolean parallelMode;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private int numberOfCheckpointsToClean;
    @Nullable
    @GuardedBy(value="lock")
    private CompletableFuture<Void> cleanUpFuture;
    @GuardedBy(value="lock")
    private final List<CompletedCheckpoint> subsumedCheckpoints = new ArrayList<CompletedCheckpoint>();

    public CheckpointsCleaner() {
        this.parallelMode = CheckpointingOptions.CLEANER_PARALLEL_MODE.defaultValue();
    }

    public CheckpointsCleaner(boolean parallelMode) {
        this.parallelMode = parallelMode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int getNumberOfCheckpointsToClean() {
        Object object = this.lock;
        synchronized (object) {
            return this.numberOfCheckpointsToClean;
        }
    }

    public void cleanCheckpoint(Checkpoint checkpoint, boolean shouldDiscard, Runnable postCleanAction, Executor executor) {
        LOG.debug("Clean checkpoint {} parallel-mode={} shouldDiscard={}", new Object[]{checkpoint.getCheckpointID(), this.parallelMode, shouldDiscard});
        if (shouldDiscard) {
            this.incrementNumberOfCheckpointsToClean();
            Checkpoint.DiscardObject discardObject = checkpoint.markAsDiscarded();
            CompletableFuture<Void> discardFuture = this.parallelMode ? discardObject.discardAsync(executor) : FutureUtils.runAsync(discardObject::discard, executor);
            discardFuture.handle((outerIgnored, outerThrowable) -> {
                if (outerThrowable != null) {
                    LOG.warn("Could not properly discard completed checkpoint {}.", (Object)checkpoint.getCheckpointID(), outerThrowable);
                }
                this.decrementNumberOfCheckpointsToClean();
                postCleanAction.run();
                return null;
            });
        } else {
            executor.execute(postCleanAction);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) {
        Object object = this.lock;
        synchronized (object) {
            this.subsumedCheckpoints.add(completedCheckpoint);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanSubsumedCheckpoints(long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) {
        Object object = this.lock;
        synchronized (object) {
            Iterator<CompletedCheckpoint> iterator = this.subsumedCheckpoints.iterator();
            while (iterator.hasNext()) {
                CompletedCheckpoint checkpoint = iterator.next();
                if (checkpoint.getCheckpointID() >= upTo || stillInUse.contains(checkpoint.getCheckpointID())) continue;
                try {
                    LOG.debug("Try to discard checkpoint {}.", (Object)checkpoint.getCheckpointID());
                    this.cleanCheckpoint(checkpoint, checkpoint.shouldBeDiscardedOnSubsume(), postCleanAction, executor);
                    iterator.remove();
                }
                catch (Exception e) {
                    LOG.warn("Fail to discard the old checkpoint {}.", (Object)checkpoint);
                }
            }
        }
    }

    public void cleanCheckpointOnFailedStoring(CompletedCheckpoint completedCheckpoint, Executor executor) {
        this.cleanCheckpoint(completedCheckpoint, true, () -> {}, executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void incrementNumberOfCheckpointsToClean() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(this.cleanUpFuture == null, "CheckpointsCleaner has already been closed");
            ++this.numberOfCheckpointsToClean;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void decrementNumberOfCheckpointsToClean() {
        Object object = this.lock;
        synchronized (object) {
            --this.numberOfCheckpointsToClean;
            this.maybeCompleteCloseUnsafe();
        }
    }

    private void maybeCompleteCloseUnsafe() {
        if (this.numberOfCheckpointsToClean == 0 && this.cleanUpFuture != null) {
            this.cleanUpFuture.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (this.cleanUpFuture == null) {
                this.cleanUpFuture = new CompletableFuture();
            }
            this.maybeCompleteCloseUnsafe();
            this.subsumedCheckpoints.clear();
            return this.cleanUpFuture;
        }
    }
}

