/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketUnawareCompactSource
extends RichSourceFunction<AppendOnlyCompactionTask> {
    private static final Logger LOG = LoggerFactory.getLogger(BucketUnawareCompactSource.class);
    private static final String COMPACTION_COORDINATOR_NAME = "Compaction Coordinator";
    private final FileStoreTable table;
    private final boolean streaming;
    private final long scanInterval;
    private final Predicate filter;
    private transient AppendOnlyTableCompactionCoordinator compactionCoordinator;
    private transient SourceFunction.SourceContext<AppendOnlyCompactionTask> ctx;
    private volatile boolean isRunning = true;

    public BucketUnawareCompactSource(FileStoreTable table, boolean isStreaming, long scanInterval, @Nullable Predicate filter) {
        this.table = table;
        this.streaming = isStreaming;
        this.scanInterval = scanInterval;
        this.filter = filter;
    }

    public void open(Configuration parameters) throws Exception {
        this.compactionCoordinator = new AppendOnlyTableCompactionCoordinator(this.table, this.streaming, this.filter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<AppendOnlyCompactionTask> sourceContext) throws Exception {
        this.ctx = sourceContext;
        while (this.isRunning) {
            boolean isEmpty;
            Object object = this.ctx.getCheckpointLock();
            synchronized (object) {
                if (!this.isRunning) {
                    return;
                }
                try {
                    List<AppendOnlyCompactionTask> tasks = this.compactionCoordinator.run();
                    isEmpty = tasks.isEmpty();
                    tasks.forEach(arg_0 -> this.ctx.collect(arg_0));
                }
                catch (EndOfScanException esf) {
                    LOG.info("Catching EndOfStreamException, the stream is finished.");
                    return;
                }
            }
            if (!isEmpty) continue;
            Thread.sleep(this.scanInterval);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        if (this.ctx != null) {
            Object object = this.ctx.getCheckpointLock();
            synchronized (object) {
                this.isRunning = false;
            }
        } else {
            this.isRunning = false;
        }
    }

    public static DataStreamSource<AppendOnlyCompactionTask> buildSource(StreamExecutionEnvironment env, BucketUnawareCompactSource source, boolean streaming, String tableIdentifier) {
        StreamSource sourceOperator = new StreamSource((SourceFunction)source);
        return new DataStreamSource(env, (TypeInformation)new CompactionTaskTypeInfo(), sourceOperator, false, "Compaction Coordinator : " + tableIdentifier, streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED);
    }
}

