/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.types.Either;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ThreadUtils;

public class AppendBypassCoordinateOperator<CommitT>
extends AbstractStreamOperator<Either<CommitT, UnawareAppendCompactionTask>>
implements OneInputStreamOperator<CommitT, Either<CommitT, UnawareAppendCompactionTask>>,
ProcessingTimeService.ProcessingTimeCallback {
    private static final long MAX_PENDING_TASKS = 5000L;
    private static final long EMIT_PER_BATCH = 100L;
    private final FileStoreTable table;
    private final MailboxExecutorImpl mailbox;
    private transient ScheduledExecutorService executorService;
    private transient LinkedBlockingQueue<UnawareAppendCompactionTask> compactTasks;

    public AppendBypassCoordinateOperator(FileStoreTable table, ProcessingTimeService processingTimeService, MailboxExecutor mailbox) {
        this.table = table;
        this.processingTimeService = processingTimeService;
        this.mailbox = (MailboxExecutorImpl)mailbox;
        this.chainingStrategy = ChainingStrategy.HEAD;
    }

    public void open() throws Exception {
        super.open();
        Preconditions.checkArgument(this.getRuntimeContext().getNumberOfParallelSubtasks() == 1, "Compaction Coordinator parallelism in paimon MUST be one.");
        long intervalMs = this.table.coreOptions().continuousDiscoveryInterval().toMillis();
        this.compactTasks = new LinkedBlockingQueue();
        UnawareAppendTableCompactionCoordinator coordinator = new UnawareAppendTableCompactionCoordinator(this.table, true, null);
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("Compaction Coordinator"));
        this.executorService.scheduleWithFixedDelay(() -> this.asyncPlan(coordinator), 0L, intervalMs, TimeUnit.MILLISECONDS);
        this.getProcessingTimeService().scheduleWithFixedDelay((ProcessingTimeService.ProcessingTimeCallback)this, 0L, intervalMs);
    }

    private void asyncPlan(UnawareAppendTableCompactionCoordinator coordinator) {
        while ((long)this.compactTasks.size() < 5000L) {
            List<UnawareAppendCompactionTask> tasks = coordinator.run();
            this.compactTasks.addAll(tasks);
            if (!tasks.isEmpty()) continue;
            break;
        }
    }

    public void onProcessingTime(long time) {
        while (this.mailbox.isIdle()) {
            int i = 0;
            while ((long)i < 100L) {
                UnawareAppendCompactionTask task = this.compactTasks.poll();
                if (task == null) {
                    return;
                }
                this.output.collect((Object)new StreamRecord((Object)Either.Right((Object)task)));
                ++i;
            }
        }
    }

    public void processElement(StreamRecord<CommitT> record) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)Either.Left((Object)record.getValue())));
    }

    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, this.executorService);
        super.close();
    }
}

