package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.shade.com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/source/StarRocksDynamicSourceFunction.class */
public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicSourceFunction.class);
    private final StarRocksSourceOptions sourceOptions;
    private QueryInfo queryInfo;
    private Long dataCount;
    private final SelectColumn[] selectColumns;
    private final List<ColumnRichInfo> columnRichInfos;
    private List<StarRocksSourceDataReader> dataReaderList;
    private StarRocksSourceQueryType queryType;
    private transient Counter counterTotalScannedRows;
    private transient AtomicBoolean dataReaderClosed;
    private static final String TOTAL_SCANNED_ROWS = "totalScannedRows";

    public StarRocksDynamicSourceFunction(TableSchema tableSchema, StarRocksSourceOptions starRocksSourceOptions) {
        this.sourceOptions = starRocksSourceOptions;
        Map<String, ColumnRichInfo> genColumnMap = StarRocksSourceCommonFunc.genColumnMap(tableSchema);
        this.columnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(genColumnMap);
        String genSQL = genSQL(starRocksSourceOptions);
        if (this.sourceOptions.getColumns().trim().toLowerCase().contains("count(")) {
            this.queryType = StarRocksSourceQueryType.QueryCount;
            this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, genSQL);
            this.selectColumns = null;
        } else {
            this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, genSQL);
            this.selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(genColumnMap, starRocksSourceOptions, this.columnRichInfos);
        }
        this.dataReaderList = new ArrayList();
    }

    public StarRocksDynamicSourceFunction(StarRocksSourceOptions starRocksSourceOptions, TableSchema tableSchema, String str, long j, SelectColumn[] selectColumnArr, StarRocksSourceQueryType starRocksSourceQueryType) {
        this.sourceOptions = starRocksSourceOptions;
        Map<String, ColumnRichInfo> genColumnMap = StarRocksSourceCommonFunc.genColumnMap(tableSchema);
        this.columnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(genColumnMap);
        if (starRocksSourceQueryType == null) {
            starRocksSourceQueryType = StarRocksSourceQueryType.QueryAllColumns;
            this.selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(genColumnMap, starRocksSourceOptions, this.columnRichInfos);
        } else {
            this.selectColumns = selectColumnArr;
        }
        String genSQL = genSQL(starRocksSourceQueryType, this.selectColumns, str, j);
        if (starRocksSourceQueryType == StarRocksSourceQueryType.QueryCount) {
            this.dataCount = StarRocksSourceCommonFunc.getQueryCount(this.sourceOptions, genSQL);
        } else {
            this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, genSQL);
        }
        this.queryType = starRocksSourceQueryType;
        this.dataReaderList = new ArrayList();
    }

    private String genSQL(StarRocksSourceOptions starRocksSourceOptions) {
        return "select " + (starRocksSourceOptions.getColumns().isEmpty() ? "*" : starRocksSourceOptions.getColumns()) + " from `" + this.sourceOptions.getDatabaseName() + "`.`" + this.sourceOptions.getTableName() + "`" + (starRocksSourceOptions.getFilter().isEmpty() ? "" : " where " + starRocksSourceOptions.getFilter());
    }

    private String genSQL(StarRocksSourceQueryType starRocksSourceQueryType, SelectColumn[] selectColumnArr, String str, long j) {
        StringBuilder sb = new StringBuilder("select ");
        switch (starRocksSourceQueryType) {
            case QueryCount:
                sb.append("count(*)");
                break;
            case QueryAllColumns:
            case QuerySomeColumns:
                sb.append((String) Arrays.stream(selectColumnArr).map(selectColumn -> {
                    return "`" + selectColumn.getColumnName() + "`";
                }).collect(Collectors.joining(",")));
                break;
        }
        sb.append(" from ");
        sb.append("`").append(this.sourceOptions.getDatabaseName()).append("`");
        sb.append(".");
        sb.append("`").append(this.sourceOptions.getTableName()).append("`");
        if (!Strings.isNullOrEmpty(str)) {
            sb.append(" where ");
            sb.append(str);
        }
        if (j > 0) {
            throw new RuntimeException("Read data from be not support limit now !");
        }
        return sb.toString();
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.dataReaderClosed = new AtomicBoolean(false);
        this.counterTotalScannedRows = getRuntimeContext().getMetricGroup().counter(TOTAL_SCANNED_ROWS);
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (this.queryType != StarRocksSourceQueryType.QueryCount) {
            StarRocksSourceCommonFunc.splitQueryBeXTablets(getRuntimeContext().getNumberOfParallelSubtasks(), this.queryInfo).get(indexOfThisSubtask).forEach(queryBeXTablets -> {
                StarRocksSourceBeReader starRocksSourceBeReader = new StarRocksSourceBeReader(queryBeXTablets.getBeNode(), this.columnRichInfos, this.selectColumns, this.sourceOptions);
                starRocksSourceBeReader.openScanner(queryBeXTablets.getTabletIds(), this.queryInfo.getQueryPlan().getOpaqued_query_plan(), this.sourceOptions);
                starRocksSourceBeReader.startToRead();
                this.dataReaderList.add(starRocksSourceBeReader);
            });
        } else if (indexOfThisSubtask == 0) {
            this.dataReaderList.add(new StarRocksSourceTrickReader(this.dataCount));
        }
        LOG.info("Open source function. {}", EnvUtils.getGitInformation());
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) {
        this.dataReaderList.parallelStream().forEach(starRocksSourceDataReader -> {
            while (starRocksSourceDataReader.hasNext()) {
                GenericRowData next = starRocksSourceDataReader.getNext();
                this.counterTotalScannedRows.inc(1L);
                sourceContext.collect(next);
            }
        });
    }

    public void cancel() {
        internalClose();
    }

    public void close() {
        internalClose();
    }

    private void internalClose() {
        if (this.dataReaderClosed.compareAndSet(false, true)) {
            LOG.info("Close readers");
            this.dataReaderList.parallelStream().forEach(starRocksSourceDataReader -> {
                if (starRocksSourceDataReader != null) {
                    starRocksSourceDataReader.close();
                }
            });
        }
    }

    public TypeInformation<RowData> getProducedType() {
        return TypeInformation.of(new TypeHint<RowData>() { // from class: com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.1
        });
    }
}
