package com.alibaba.ververica.connectors.hologres.source;

import com.alibaba.ververica.connectors.common.dim.AsyncLookupFunctionWrapper;
import com.alibaba.ververica.connectors.common.dim.DimOptions;
import com.alibaba.ververica.connectors.common.dim.LookupFunctionWrapper;
import com.alibaba.ververica.connectors.common.dim.cache.CacheConfig;
import com.alibaba.ververica.connectors.common.source.SourceUtils;
import com.alibaba.ververica.connectors.common.source.resolver.DefaultSourceCollector;
import com.alibaba.ververica.connectors.common.table.VervericaTableOptions;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.bhclient.HologresBhclientBinlogSource;
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs;
import com.alibaba.ververica.connectors.hologres.binlog.RetryUtil;
import com.alibaba.ververica.connectors.hologres.binlog.StartupMode;
import com.alibaba.ververica.connectors.hologres.binlog.source.converter.BhclientBinlogRecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.converter.HoloHubBinlogRecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.converter.JDBCBinlogRecordConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConfigs;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.config.HologresTableConfigs;
import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
import com.alibaba.ververica.connectors.hologres.config.SDKMode;
import com.alibaba.ververica.connectors.hologres.holohub.HologresHolohubBinlogSource;
import com.alibaba.ververica.connectors.hologres.holohub.HolohubClientProvider;
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCBinlogSource;
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCReader;
import com.alibaba.ververica.connectors.hologres.source.lookup.HologresAsyncLookupFunction;
import com.alibaba.ververica.connectors.hologres.source.lookup.HologresLookupFunction;
import com.alibaba.ververica.connectors.hologres.source.lookup.LookupKeysWrapper;
import com.alibaba.ververica.connectors.hologres.source.scan.bulkread.HologresBulkreadInputFormat;
import com.alibaba.ververica.connectors.hologres.utils.HoloBinlogUtil;
import com.alibaba.ververica.connectors.hologres.utils.HologresUtils;
import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.hologres.com.aliyun.datahub.client.DatahubClient;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/source/HologresTableSource.class */
public class HologresTableSource implements DynamicTableSource, LookupTableSource, ScanTableSource, SupportsLimitPushDown, SupportsFilterPushDown {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HologresTableSource.class);
    private final String sqlTableName;
    private final TableSchema tableSchema;
    private final CacheConfig cacheConfig;
    private final ReadableConfig config;
    private final JDBCOptions jdbcOptions;
    private final boolean binlog;
    private final boolean cdcMode;
    private final boolean upsertSource;
    private final boolean enableFilterPushDownByConnectorConfigs;
    private final boolean enableFilterPushDownByTableConfigs;
    private final StartupMode startupMode;
    private transient ScanTableSource.ScanRuntimeProvider scanProvider;
    private final transient HashMap<LookupKeysWrapper, LookupTableSource.LookupRuntimeProvider> lookupProviders = new HashMap<>();
    private ArrayList<String> resolvedPredicates = new ArrayList<>();
    private long limit = -1;

    public HologresTableSource(String str, TableSchema tableSchema, CacheConfig cacheConfig, JDBCOptions jDBCOptions, ReadableConfig readableConfig) {
        this.sqlTableName = str;
        this.tableSchema = tableSchema;
        this.cacheConfig = cacheConfig;
        this.jdbcOptions = jDBCOptions;
        this.config = readableConfig;
        this.binlog = ((Boolean) readableConfig.get(HologresBinlogConfigs.OPTIONAL_BINLOG)).booleanValue();
        this.cdcMode = ((Boolean) readableConfig.get(HologresBinlogConfigs.BINLOG_CDC_MODE)).booleanValue() && ((Boolean) readableConfig.get(HologresBinlogConfigs.OPTIONAL_BINLOG)).booleanValue();
        this.upsertSource = ((Boolean) readableConfig.get(HologresBinlogConfigs.BINLOG_UPSERT_CHANGELOG_MODE)).booleanValue();
        this.startupMode = HoloBinlogUtil.getStartupMode((String) readableConfig.get(HologresBinlogConfigs.BINLOG_STARTUP_MODE));
        this.enableFilterPushDownByConnectorConfigs = ((Boolean) readableConfig.get(HologresConfigs.ENABLE_FILTER_PUSH_DOWN)).booleanValue();
        this.enableFilterPushDownByTableConfigs = ((Boolean) readableConfig.get(HologresTableConfigs.HOLOGRES_SOURCE_LEGACY_FILTER_PUSH_DOWN_ENABLED)).booleanValue();
    }

    public DynamicTableSource copy() {
        HologresTableSource hologresTableSource = new HologresTableSource(this.sqlTableName, this.tableSchema, this.cacheConfig, this.jdbcOptions, this.config);
        hologresTableSource.resolvedPredicates = new ArrayList<>(this.resolvedPredicates);
        hologresTableSource.limit = this.limit;
        return hologresTableSource;
    }

    public String asSummaryString() {
        return "Hologres-" + this.sqlTableName;
    }

    @VisibleForTesting
    public static boolean primaryKeyFullProvided(String[] strArr, HologresTableSchema hologresTableSchema) {
        return ((Set) Arrays.stream(strArr).collect(Collectors.toSet())).equals(Arrays.stream(hologresTableSchema.get().getPrimaryKeys()).collect(Collectors.toSet()));
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
        if (this.enableFilterPushDownByConnectorConfigs) {
            throw new ValidationException("Hologres dimension table does not support filters push down.");
        }
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(this.config);
        if (SDKMode.HOLOHUB == hologresConnectionParam.getSDKMode()) {
            throw new ValidationException("Hologres dimension table does not support holohub mode.");
        }
        String[] strArr = new String[lookupContext.getKeys().length];
        for (int i = 0; i < strArr.length; i++) {
            int[] iArr = lookupContext.getKeys()[i];
            Preconditions.checkArgument(iArr.length == 1, "Do not support nested lookup keys");
            strArr[i] = this.tableSchema.getFieldNames()[iArr[0]];
        }
        LookupKeysWrapper lookupKeysWrapper = new LookupKeysWrapper(strArr);
        if (this.lookupProviders.containsKey(lookupKeysWrapper)) {
            return this.lookupProviders.get(lookupKeysWrapper);
        }
        HologresTableSchema hologresTableSchema = HologresTableSchema.get(hologresConnectionParam);
        boolean primaryKeyFullProvided = primaryKeyFullProvided(strArr, hologresTableSchema);
        if ((hologresConnectionParam.getSDKMode() == SDKMode.JDBC_FIXED || hologresConnectionParam.getSDKMode() == SDKMode.BHCLIENT || hologresConnectionParam.getSDKMode() == SDKMode.RPC) && !primaryKeyFullProvided) {
            throw new UnsupportedOperationException("Just hologres jdbc connection mode dimension table support one to many join.");
        }
        HologresJDBCReader<RowData> createTableReader = HologresJDBCReader.createTableReader(hologresConnectionParam, this.tableSchema, strArr, hologresTableSchema);
        AsyncTableFunctionProvider of = ((Boolean) this.config.get(DimOptions.OPTIONAL_ASYNC)).booleanValue() ? AsyncTableFunctionProvider.of(new AsyncLookupFunctionWrapper(new HologresAsyncLookupFunction(this.sqlTableName, this.tableSchema, strArr, this.cacheConfig.getCacheStrategy(), createTableReader, primaryKeyFullProvided))) : TableFunctionProvider.of(new LookupFunctionWrapper(new HologresLookupFunction(this.sqlTableName, this.tableSchema, strArr, this.cacheConfig.getCacheStrategy(), createTableReader, primaryKeyFullProvided)));
        this.lookupProviders.put(lookupKeysWrapper, of);
        return of;
    }

    public ChangelogMode getChangelogMode() {
        return this.cdcMode ? this.upsertSource ? ChangelogMode.upsert() : ChangelogMode.all() : ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        if (this.scanProvider != null) {
            return this.scanProvider;
        }
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(this.config);
        String join = String.join(" and ", this.resolvedPredicates);
        if (this.binlog) {
            long longValue = ((Long) this.config.get(VervericaTableOptions.START_TIME_MILLS)).longValue();
            if (longValue < 0) {
                longValue = 0;
                String str = (String) this.config.get(VervericaTableOptions.OPTIONAL_START_TIME);
                if (!StringUtils.isNullOrWhitespaceOnly(str)) {
                    try {
                        longValue = SourceUtils.parseDateString(VervericaTableOptions.DATE_FORMAT, str).longValue();
                    } catch (ParseException e) {
                        throw new RuntimeException(String.format("Incorrect datetime format: %s, pls use ISO-8601 complete date plus hours, minutes and seconds format:%s", str, VervericaTableOptions.DATE_FORMAT), e);
                    }
                }
            }
            HashSet hashSet = new HashSet();
            String str2 = (String) this.config.get(DefaultSourceCollector.CollectorOption.PARSER_NULL_VALUES);
            String str3 = (String) this.config.get(DefaultSourceCollector.CollectorOption.PARSER_NULL_VALUES_DELIMITER);
            if (null != str2) {
                if (str2.contains(str3)) {
                    for (String str4 : org.apache.commons.lang3.StringUtils.splitPreserveAllTokens(str2, str3)) {
                        hashSet.add(StringEscapeUtils.unescapeJava(str4));
                    }
                } else {
                    hashSet.add(StringEscapeUtils.unescapeJava(str2));
                }
            }
            if (SDKMode.JDBC == hologresConnectionParam.getSDKMode()) {
                if (hologresConnectionParam.getJdbcBinlogSlotName() == null) {
                    String orCreateDefaultSlotForJDBCBinlog = HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(this.jdbcOptions);
                    LOG.warn("User has not set the the option of \"jdbcBinlogSlotName\", will use the default slot named {}.", orCreateDefaultSlotForJDBCBinlog);
                    hologresConnectionParam.setJdbcBinlogSlotName(orCreateDefaultSlotForJDBCBinlog);
                }
                if (!JDBCUtils.isPublicCloudEnv(this.jdbcOptions) && JDBCUtils.couldConnectDirectly(this.jdbcOptions)) {
                    hologresConnectionParam.setEnableDirectConnect(true);
                }
                this.scanProvider = SourceProvider.of(new HologresJDBCBinlogSource(hologresConnectionParam, this.tableSchema, this.config, this.jdbcOptions, longValue, this.startupMode, new JDBCBinlogRecordConverter(this.jdbcOptions.getTable(), this.tableSchema, hologresConnectionParam, this.cdcMode, hashSet), join, this.limit));
            } else if (HologresUtils.shouldUseHoloHub(hologresConnectionParam)) {
                this.jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(this.jdbcOptions));
                this.scanProvider = SourceProvider.of(new HologresHolohubBinlogSource(hologresConnectionParam, this.tableSchema, this.config, this.jdbcOptions, longValue, this.startupMode, new HoloHubBinlogRecordConverter(this.jdbcOptions.getTable(), this.tableSchema, hologresConnectionParam, RetryUtil.getTopicWithRetry((DatahubClient) HolohubClientProvider.newHolohubClientProvider(this.jdbcOptions).getClient(), this.jdbcOptions.getDatabase(), this.jdbcOptions.getBinlogTableName()), this.cdcMode, hashSet), join, this.limit));
            } else {
                BhclientBinlogRecordConverter bhclientBinlogRecordConverter = new BhclientBinlogRecordConverter(this.jdbcOptions.getTable(), this.tableSchema, hologresConnectionParam, this.cdcMode, hashSet);
                this.scanProvider = SourceProvider.of(new HologresBhclientBinlogSource(hologresConnectionParam, this.tableSchema, this.config, this.jdbcOptions, longValue, this.startupMode, bhclientBinlogRecordConverter, this.sqlTableName, bhclientBinlogRecordConverter.getBlinkTableFields(), join, this.limit));
            }
        } else {
            this.scanProvider = InputFormatProvider.of(new HologresBulkreadInputFormat(hologresConnectionParam, this.jdbcOptions, this.tableSchema, join, this.limit));
        }
        return this.scanProvider;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public SupportsFilterPushDown.Result applyFilters(List<ResolvedExpression> list) {
        if (!this.enableFilterPushDownByConnectorConfigs && !this.enableFilterPushDownByTableConfigs) {
            return SupportsFilterPushDown.Result.of(Collections.emptyList(), list);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HologresUtils.ExpressionExtractor expressionExtractor = new HologresUtils.ExpressionExtractor();
        for (ResolvedExpression resolvedExpression : list) {
            Optional optional = (Optional) resolvedExpression.accept(expressionExtractor);
            if (optional.isPresent()) {
                this.resolvedPredicates.add(optional.get());
                LOG.info("convert filter expression [{}] to sql [{}].", resolvedExpression.asSummaryString(), optional.get());
                arrayList.add(resolvedExpression);
            } else {
                LOG.info("filter [{}] not support push down, add it to remainingFilters.", resolvedExpression.asSummaryString());
                arrayList2.add(resolvedExpression);
            }
        }
        return this.binlog ? SupportsFilterPushDown.Result.of(arrayList, list) : SupportsFilterPushDown.Result.of(arrayList, arrayList2);
    }

    public void applyLimit(long j) {
        this.limit = j;
    }
}
