/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.factory;

import java.time.Duration;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MySqlDataSourceFactory
implements DataSourceFactory {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSourceFactory.class);
    public static final String IDENTIFIER = "mysql";
    private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
    private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
    private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
    private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";

    public DataSource createDataSource(Factory.Context context) {
        FactoryHelper.createFactoryHelper((Factory)this, (Factory.Context)context).validateExcept(new String[]{"jdbc.properties.", "debezium."});
        Configuration config = context.getFactoryConfiguration();
        String hostname = (String)config.get(MySqlDataSourceOptions.HOSTNAME);
        int port = (Integer)config.get(MySqlDataSourceOptions.PORT);
        String username = (String)config.get(MySqlDataSourceOptions.USERNAME);
        String password = (String)config.get(MySqlDataSourceOptions.PASSWORD);
        String tables = (String)config.get(MySqlDataSourceOptions.TABLES);
        String tablesExclude = (String)config.get(MySqlDataSourceOptions.TABLES_EXCLUDE);
        String serverId = this.validateAndGetServerId(config);
        ZoneId serverTimeZone = MySqlDataSourceFactory.getServerTimeZone(config);
        StartupOptions startupOptions = MySqlDataSourceFactory.getStartupOptions(config);
        boolean includeSchemaChanges = (Boolean)config.get(MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED);
        int fetchSize = (Integer)config.get(MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        int splitSize = (Integer)config.get(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        int splitMetaGroupSize = (Integer)config.get(MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE);
        double distributionFactorUpper = (Double)config.get(MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        double distributionFactorLower = (Double)config.get(MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
        boolean closeIdleReaders = (Boolean)config.get(MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        Duration heartbeatInterval = (Duration)config.get(MySqlDataSourceOptions.HEARTBEAT_INTERVAL);
        Duration connectTimeout = (Duration)config.get(MySqlDataSourceOptions.CONNECT_TIMEOUT);
        int connectMaxRetries = (Integer)config.get(MySqlDataSourceOptions.CONNECT_MAX_RETRIES);
        int connectionPoolSize = (Integer)config.get(MySqlDataSourceOptions.CONNECTION_POOL_SIZE);
        boolean scanNewlyAddedTableEnabled = (Boolean)config.get(MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
        this.validateIntegerOption(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
        this.validateIntegerOption(MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
        this.validateIntegerOption(MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
        this.validateIntegerOption(MySqlDataSourceOptions.CONNECTION_POOL_SIZE, connectionPoolSize, 1);
        this.validateIntegerOption(MySqlDataSourceOptions.CONNECT_MAX_RETRIES, connectMaxRetries, 0);
        this.validateDistributionFactorUpper(distributionFactorUpper);
        this.validateDistributionFactorLower(distributionFactorLower);
        Map configMap = config.toMap();
        OptionUtils.printOptions(IDENTIFIER, config.toMap());
        MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory().hostname(hostname).port(port).username(username).password(password).databaseList(".*").tableList(".*").startupOptions(startupOptions).serverId(serverId).serverTimeZone(serverTimeZone.getId()).fetchSize(fetchSize).splitSize(splitSize).splitMetaGroupSize(splitMetaGroupSize).distributionFactorLower(distributionFactorLower).distributionFactorUpper(distributionFactorUpper).heartbeatInterval(heartbeatInterval).connectTimeout(connectTimeout).connectMaxRetries(connectMaxRetries).connectionPoolSize(connectionPoolSize).closeIdleReaders(closeIdleReaders).includeSchemaChanges(includeSchemaChanges).debeziumProperties(DebeziumOptions.getDebeziumProperties(configMap)).jdbcProperties(JdbcUrlUtils.getJdbcProperties(configMap)).scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
        Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
        List<String> capturedTables = MySqlDataSourceFactory.getTableList(configFactory.createConfig(0), selectors);
        if (capturedTables.isEmpty()) {
            throw new IllegalArgumentException("Cannot find any table by the option 'tables' = " + tables);
        }
        if (tablesExclude != null) {
            Selectors selectExclude = new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
            List<String> excludeTables = MySqlDataSourceFactory.getTableList(configFactory.createConfig(0), selectExclude);
            if (!excludeTables.isEmpty()) {
                capturedTables.removeAll(excludeTables);
            }
            if (capturedTables.isEmpty()) {
                throw new IllegalArgumentException("Cannot find any table with by the option 'tables.exclude'  = " + tablesExclude);
            }
        }
        configFactory.tableList(capturedTables.toArray(new String[0]));
        String chunkKeyColumns = (String)config.get(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
        if (chunkKeyColumns != null) {
            HashMap<ObjectPath, String> chunkKeyColumnMap = new HashMap<ObjectPath, String>();
            List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
            for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
                String[] splits = chunkKeyColumn.split(":");
                if (splits.length == 2) {
                    Selectors chunkKeySelector = new Selectors.SelectorsBuilder().includeTables(splits[0]).build();
                    List<ObjectPath> tableList = MySqlDataSourceFactory.getChunkKeyColumnTableList(tableIds, chunkKeySelector);
                    for (ObjectPath table : tableList) {
                        chunkKeyColumnMap.put(table, splits[1]);
                    }
                    continue;
                }
                throw new IllegalArgumentException(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key() + " = " + chunkKeyColumns + " failed to be parsed in this part '" + chunkKeyColumn + "'.");
            }
            LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
            configFactory.chunkKeyColumn(chunkKeyColumnMap);
        }
        return new MySqlDataSource(configFactory);
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(MySqlDataSourceOptions.HOSTNAME);
        options.add(MySqlDataSourceOptions.USERNAME);
        options.add(MySqlDataSourceOptions.PASSWORD);
        options.add(MySqlDataSourceOptions.TABLES);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        HashSet options = new HashSet();
        options.add(MySqlDataSourceOptions.PORT);
        options.add(MySqlDataSourceOptions.TABLES_EXCLUDE);
        options.add(MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED);
        options.add(MySqlDataSourceOptions.SERVER_ID);
        options.add(MySqlDataSourceOptions.SERVER_TIME_ZONE);
        options.add(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        options.add(MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_MODE);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
        options.add(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
        options.add(MySqlDataSourceOptions.CONNECT_TIMEOUT);
        options.add(MySqlDataSourceOptions.CONNECT_MAX_RETRIES);
        options.add(MySqlDataSourceOptions.CONNECTION_POOL_SIZE);
        options.add(MySqlDataSourceOptions.HEARTBEAT_INTERVAL);
        options.add(MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        options.add(MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
        options.add(MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
        options.add(MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE);
        options.add(MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
        options.add(MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
        return options;
    }

    public String identifier() {
        return IDENTIFIER;
    }

    private static List<String> getTableList(MySqlSourceConfig sourceConfig, Selectors selectors) {
        return MySqlSchemaUtils.listTables(sourceConfig, null).stream().filter(arg_0 -> ((Selectors)selectors).isMatch(arg_0)).map(TableId::toString).collect(Collectors.toList());
    }

    private static List<ObjectPath> getChunkKeyColumnTableList(List<TableId> tableIds, Selectors selectors) {
        return tableIds.stream().filter(arg_0 -> ((Selectors)selectors).isMatch(arg_0)).map(tableId -> new ObjectPath(tableId.getSchemaName(), tableId.getTableName())).collect(Collectors.toList());
    }

    private static StartupOptions getStartupOptions(Configuration config) {
        String modeString = (String)config.get(MySqlDataSourceOptions.SCAN_STARTUP_MODE);
        switch (modeString.toLowerCase()) {
            case "initial": {
                return StartupOptions.initial();
            }
            case "snapshot": {
                return StartupOptions.snapshot();
            }
            case "latest-offset": {
                return StartupOptions.latest();
            }
            case "earliest-offset": {
                return StartupOptions.earliest();
            }
            case "specific-offset": {
                MySqlDataSourceFactory.validateSpecificOffset(config);
                return MySqlDataSourceFactory.getSpecificOffset(config);
            }
            case "timestamp": {
                return StartupOptions.timestamp((Long)config.get(MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS));
            }
        }
        throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s", MySqlDataSourceOptions.SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE_VALUE_INITIAL, SCAN_STARTUP_MODE_VALUE_SNAPSHOT, SCAN_STARTUP_MODE_VALUE_LATEST, SCAN_STARTUP_MODE_VALUE_EARLIEST, SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET, SCAN_STARTUP_MODE_VALUE_TIMESTAMP, modeString));
    }

    private static void validateSpecificOffset(Configuration config) {
        Optional gtidSet = config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
        Optional binlogFilename = config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
        Optional binlogPosition = config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
        if (!(gtidSet.isPresent() || binlogFilename.isPresent() && binlogPosition.isPresent())) {
            throw new ValidationException(String.format("Unable to find a valid binlog offset. Either %s, or %s and %s are required.", MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key(), MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key(), MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key()));
        }
    }

    private static StartupOptions getSpecificOffset(Configuration config) {
        BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
        config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET).ifPresent(offsetBuilder::setGtidSet);
        Optional binlogFilename = config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
        Optional binlogPosition = config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
        if (binlogFilename.isPresent() && binlogPosition.isPresent()) {
            offsetBuilder.setBinlogFilePosition((String)binlogFilename.get(), (Long)binlogPosition.get());
        } else {
            offsetBuilder.setBinlogFilePosition("", 0L);
        }
        config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS).ifPresent(offsetBuilder::setSkipEvents);
        config.getOptional(MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS).ifPresent(offsetBuilder::setSkipRows);
        return StartupOptions.specificOffset(offsetBuilder.build());
    }

    private String validateAndGetServerId(Configuration configuration) {
        String serverIdValue = (String)configuration.get(MySqlDataSourceOptions.SERVER_ID);
        if (serverIdValue != null) {
            try {
                ServerIdRange.from(serverIdValue);
            }
            catch (Exception e) {
                throw new ValidationException(String.format("The value of option 'server-id' is invalid: '%s'", serverIdValue), (Throwable)e);
            }
        }
        return serverIdValue;
    }

    private void validateIntegerOption(ConfigOption<Integer> option, int optionValue, int exclusiveMin) {
        Preconditions.checkState((optionValue > exclusiveMin ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must larger than %d, but is %d", option.key(), exclusiveMin, optionValue));
    }

    private void validateDistributionFactorUpper(double distributionFactorUpper) {
        Preconditions.checkState((ObjectUtils.doubleCompare(distributionFactorUpper, 1.0) >= 0 ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must larger than or equals %s, but is %s", MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), 1.0, distributionFactorUpper));
    }

    private void validateDistributionFactorLower(double distributionFactorLower) {
        Preconditions.checkState((ObjectUtils.doubleCompare(distributionFactorLower, 0.0) >= 0 && ObjectUtils.doubleCompare(distributionFactorLower, 1.0) <= 0 ? 1 : 0) != 0, (Object)String.format("The value of option '%s' must between %s and %s inclusively, but is %s", MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), 0.0, 1.0, distributionFactorLower));
    }

    private static ZoneId getServerTimeZone(Configuration config) {
        String serverTimeZone = (String)config.get(MySqlDataSourceOptions.SERVER_TIME_ZONE);
        if (serverTimeZone != null) {
            return ZoneId.of(serverTimeZone);
        }
        LOG.warn("{} is not set, which might cause data inconsistencies for time-related fields.", (Object)MySqlDataSourceOptions.SERVER_TIME_ZONE.key());
        return ZoneId.systemDefault();
    }
}

