/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import java.util.ArrayList;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.system.FileMonitorTable;
import org.apache.paimon.utils.SerializationUtils;

public class QueryFileMonitor
extends AbstractNonCoordinatedSource<InternalRow> {
    private static final long serialVersionUID = 1L;
    private final Table table;
    private final long monitorInterval;

    public QueryFileMonitor(Table table) {
        this.table = table;
        this.monitorInterval = Options.fromMap(table.options()).get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<InternalRow, SimpleSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new Reader();
    }

    public static DataStream<InternalRow> build(StreamExecutionEnvironment env, Table table) {
        return env.fromSource((Source)new QueryFileMonitor(table), WatermarkStrategy.noWatermarks(), "FileMonitor-" + table.name(), InternalTypeInfo.fromRowType(FileMonitorTable.getRowType())).setParallelism(1);
    }

    public static ChannelComputer<InternalRow> createChannelComputer() {
        return new FileMonitorChannelComputer();
    }

    private static class FileMonitorChannelComputer
    implements ChannelComputer<InternalRow> {
        private int numChannels;

        private FileMonitorChannelComputer() {
        }

        @Override
        public void setup(int numChannels) {
            this.numChannels = numChannels;
        }

        @Override
        public int channel(InternalRow row) {
            BinaryRow partition = SerializationUtils.deserializeBinaryRow(row.getBinary(1));
            int bucket = row.getInt(2);
            return ChannelComputer.select(partition, bucket, this.numChannels);
        }

        public String toString() {
            return "FileMonitorChannelComputer{numChannels=" + this.numChannels + '}';
        }
    }

    private class Reader
    extends AbstractNonCoordinatedSourceReader<InternalRow> {
        private transient StreamTableScan scan;
        private transient TableRead read;

        private Reader() {
        }

        @Override
        public void start() {
            FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable)QueryFileMonitor.this.table);
            ReadBuilder readBuilder = monitorTable.newReadBuilder().dropStats();
            this.scan = readBuilder.newStreamScan();
            this.read = readBuilder.newRead();
        }

        public InputStatus pollNext(ReaderOutput<InternalRow> readerOutput) throws Exception {
            boolean isEmpty = this.doScan(readerOutput);
            if (isEmpty) {
                Thread.sleep(QueryFileMonitor.this.monitorInterval);
            }
            return InputStatus.MORE_AVAILABLE;
        }

        private boolean doScan(ReaderOutput<InternalRow> readerOutput) throws Exception {
            ArrayList records = new ArrayList();
            this.read.createReader(this.scan.plan()).forEachRemaining(records::add);
            records.forEach(arg_0 -> readerOutput.collect(arg_0));
            return records.isEmpty();
        }
    }
}

