/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.service;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.service.QueryAddressRegister;
import org.apache.paimon.flink.service.QueryExecutorOperator;
import org.apache.paimon.flink.service.QueryFileMonitor;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;

public class QueryService {
    public static void build(StreamExecutionEnvironment env, Table table, int parallelism) {
        ReadableConfig conf = env.getConfiguration();
        Preconditions.checkArgument(conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, "Query Service only supports streaming mode.");
        FileStoreTable storeTable = (FileStoreTable)table;
        if (storeTable.bucketMode() != BucketMode.HASH_FIXED || storeTable.schema().primaryKeys().isEmpty()) {
            throw new UnsupportedOperationException("The bucket mode of " + table.name() + " is not fixed or the table has no primary key.");
        }
        DataStream<InternalRow> stream = QueryFileMonitor.build(env, table);
        stream = FlinkStreamPartitioner.partition(stream, QueryFileMonitor.createChannelComputer(), parallelism);
        QueryExecutorOperator executorOperator = new QueryExecutorOperator(table);
        DataStreamSink sink = stream.transform("Executor", InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()), (OneInputStreamOperator)executorOperator).setParallelism(parallelism).addSink((SinkFunction)new QueryAddressRegister(table)).setParallelism(1);
        sink.getTransformation().setMaxParallelism(1);
    }
}

