/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.sstmerge;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.contrib.streaming.state.sstmerge.CompactionTask;
import org.apache.flink.contrib.streaming.state.sstmerge.CompactionTaskProducer;
import org.apache.flink.contrib.streaming.state.sstmerge.CompactionTracker;
import org.apache.flink.contrib.streaming.state.sstmerge.Compactor;
import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CompactionScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(CompactionScheduler.class);
    private final ScheduledExecutorService scheduledExecutor;
    private final ExecutorService ioExecutor;
    private final long checkPeriodMs;
    private final CompactionTracker tracker;
    private final Compactor compactor;
    private final CompactionTaskProducer taskProducer;
    private final Object lock = new Object();
    private boolean running = true;

    public CompactionScheduler(RocksDBManualCompactionConfig settings, ExecutorService ioExecutor, CompactionTaskProducer taskProducer, Compactor compactor, CompactionTracker tracker) {
        this(settings, ioExecutor, taskProducer, compactor, tracker, Executors.newSingleThreadScheduledExecutor());
    }

    public CompactionScheduler(RocksDBManualCompactionConfig settings, ExecutorService ioExecutor, CompactionTaskProducer taskProducer, Compactor compactor, CompactionTracker tracker, ScheduledExecutorService scheduledExecutor) {
        this.ioExecutor = ioExecutor;
        this.scheduledExecutor = scheduledExecutor;
        this.checkPeriodMs = settings.minInterval;
        this.tracker = tracker;
        this.compactor = compactor;
        this.taskProducer = taskProducer;
    }

    public void start() {
        this.scheduleScan();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws InterruptedException {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                this.scheduledExecutor.shutdownNow();
            }
        }
        if (!this.scheduledExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to terminate scheduled tasks in 5s");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scheduleScan() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                LOG.trace("Schedule SST scan in {} ms", (Object)this.checkPeriodMs);
                this.scheduledExecutor.schedule(() -> this.ioExecutor.execute(this::maybeScan), this.checkPeriodMs, TimeUnit.MILLISECONDS);
            } else {
                LOG.debug("Not scheduling next scan: shutting down");
            }
        }
    }

    public void maybeScan() {
        LOG.trace("Starting SST scan");
        if (this.tracker.haveManualCompactions() || this.tracker.isShuttingDown()) {
            LOG.trace("Skip SST scan {}", (Object)this.tracker);
            return;
        }
        List<CompactionTask> targets = this.scan();
        LOG.trace("SST scan resulted in targets {}", targets);
        if (targets.isEmpty()) {
            this.scheduleScan();
            return;
        }
        for (CompactionTask target : targets) {
            this.ioExecutor.execute(() -> this.tracker.runWithTracking(target.columnFamilyHandle, () -> this.compactor.compact(target.columnFamilyHandle, target.level, target.files), this::scheduleScan));
        }
    }

    private List<CompactionTask> scan() {
        try {
            return this.taskProducer.produce();
        }
        catch (Exception e) {
            LOG.warn("Unable to scan for compaction targets", (Throwable)e);
            return Collections.emptyList();
        }
    }
}

