/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.GtidUtils;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlErrorHandler;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSourceFetchTaskContext
extends JdbcSourceFetchTaskContext {
    private static final Logger log = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
    private final MySqlConnection connection;
    private final BinaryLogClient binaryLogClient;
    private final MySqlEventMetadataProvider metadataProvider;
    private MySqlDatabaseSchema databaseSchema;
    private MySqlTaskContextImpl taskContext;
    private MySqlOffsetContext offsetContext;
    private SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
    private MySqlStreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
    private TopicSelector<TableId> topicSelector;
    private JdbcSourceEventDispatcher<MySqlPartition> dispatcher;
    private MySqlPartition mySqlPartition;
    private ChangeEventQueue<DataChangeEvent> queue;
    private MySqlErrorHandler errorHandler;
    private RelationalDatabaseConnectorConfig dbzConnectorConfig;

    public MySqlSourceFetchTaskContext(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
        super(sourceConfig, dataSourceDialect);
        this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
        this.connection = MySqlConnectionUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
        this.binaryLogClient = MySqlConnectionUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
        this.metadataProvider = new MySqlEventMetadataProvider();
    }

    @Override
    public void configure(SourceSplitBase sourceSplitBase) {
        super.registerDatabaseHistory(sourceSplitBase, this.connection);
        MySqlConnectorConfig connectorConfig = this.getDbzConnectorConfig();
        boolean tableIdCaseInsensitive = this.connection.isTableIdCaseSensitive();
        this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
        this.databaseSchema = MySqlConnectionUtils.createMySqlDatabaseSchema(connectorConfig, tableIdCaseInsensitive);
        this.offsetContext = this.loadStartingOffsetState(new MySqlOffsetContext.Loader(connectorConfig), sourceSplitBase);
        this.mySqlPartition = new MySqlPartition(connectorConfig.getLogicalName());
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new MySqlTaskContextImpl(connectorConfig, this.databaseSchema, this.binaryLogClient);
        int queueSize = sourceSplitBase.isSnapshotSplit() && this.isExactlyOnce() ? Integer.MAX_VALUE : this.getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("mysql-cdc-connector-task")).build();
        this.dispatcher = new JdbcSourceEventDispatcher((CommonConnectorConfig)connectorConfig, this.topicSelector, this.databaseSchema, this.queue, connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, (EventMetadataProvider)this.metadataProvider, this.schemaNameAdjuster);
        MySqlChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new MySqlChangeEventSourceMetricsFactory(new MySqlStreamingChangeEventSourceMetrics(this.taskContext, (ChangeEventQueueMetrics)this.queue, (EventMetadataProvider)this.metadataProvider));
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = (MySqlStreamingChangeEventSourceMetrics)changeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new MySqlErrorHandler(connectorConfig, this.queue);
    }

    @Override
    public void close() {
        try {
            this.connection.close();
            this.binaryLogClient.disconnect();
        }
        catch (SQLException e) {
            log.warn("Failed to close connection", (Throwable)e);
        }
        catch (IOException e) {
            log.warn("Failed to close binaryLogClient", (Throwable)e);
        }
    }

    @Override
    public MySqlSourceConfig getSourceConfig() {
        return (MySqlSourceConfig)this.sourceConfig;
    }

    public MySqlConnection getConnection() {
        return this.connection;
    }

    public BinaryLogClient getBinaryLogClient() {
        return this.binaryLogClient;
    }

    public MySqlTaskContextImpl getTaskContext() {
        return this.taskContext;
    }

    @Override
    public MySqlConnectorConfig getDbzConnectorConfig() {
        return (MySqlConnectorConfig)super.getDbzConnectorConfig();
    }

    @Override
    public MySqlOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    @Override
    public MySqlPartition getPartition() {
        return this.mySqlPartition;
    }

    public SnapshotChangeEventSourceMetrics<MySqlPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public MySqlStreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    @Override
    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    @Override
    public MySqlDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    @Override
    public SeaTunnelRowType getSplitType(Table table) {
        return MySqlUtils.getSplitType(table, this.dbzConnectorConfig);
    }

    @Override
    public JdbcSourceEventDispatcher<MySqlPartition> getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    @Override
    public Tables.TableFilter getTableFilter() {
        return this.getDbzConnectorConfig().getTableFilters().dataCollectionFilter();
    }

    @Override
    public Offset getStreamOffset(SourceRecord sourceRecord) {
        return MySqlUtils.getBinlogPosition(sourceRecord);
    }

    private MySqlOffsetContext loadStartingOffsetState(MySqlOffsetContext.Loader loader, SourceSplitBase mySqlSplit) {
        Offset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET : mySqlSplit.asIncrementalSplit().getStartupOffset();
        OffsetContext mySqlOffsetContext = loader.load((Map)offset.getOffset());
        if (!this.isBinlogAvailable((MySqlOffsetContext)mySqlOffsetContext)) {
            throw new IllegalStateException("The connector is trying to read binlog starting at " + ((MySqlOffsetContext)mySqlOffsetContext).getSourceInfo() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
        }
        return mySqlOffsetContext;
    }

    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr != null) {
            return this.checkGtidSet(offset);
        }
        return this.checkBinlogFilename(offset);
    }

    private boolean checkBinlogFilename(MySqlOffsetContext offset) {
        String binlogFilename = offset.getSourceInfo().getString("file");
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        List<String> logNames = this.connection.availableBinlogFiles();
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            LOG.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
        } else {
            LOG.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        }
        return found;
    }

    private boolean checkGtidSet(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr.trim().isEmpty()) {
            return true;
        }
        String availableGtidStr = this.connection.knownGtidSet();
        if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
            LOG.warn("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
            return false;
        }
        GtidSet availableGtidSet = new GtidSet(availableGtidStr);
        LOG.info("Merging server GTID set {} with restored GTID set {}", (Object)availableGtidSet, (Object)gtidStr);
        GtidSet gtidSet = GtidUtils.fixRestoredGtidSet(availableGtidSet, new GtidSet(gtidStr));
        LOG.info("Merged GTID set is {}", (Object)gtidSet);
        if (gtidSet.isContainedWithin(availableGtidSet)) {
            LOG.info("MySQL current GTID set {} does contain the GTID set {} required by the connector.", (Object)availableGtidSet, (Object)gtidSet);
            GtidSet gtidSetToReplicate = this.connection.subtractGtidSet(availableGtidSet, gtidSet);
            GtidSet purgedGtidSet = this.connection.purgedGtidSet();
            LOG.info("Server has already purged {} GTIDs", (Object)purgedGtidSet);
            GtidSet nonPurgedGtidSetToReplicate = this.connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
            LOG.info("GTID set {} known by the server but not processed yet, for replication are available only GTID set {}", (Object)gtidSetToReplicate, (Object)nonPurgedGtidSetToReplicate);
            if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
                LOG.warn("Some of the GTIDs needed to replicate have been already purged");
                return false;
            }
            return true;
        }
        LOG.info("Connector last known GTIDs are {}, but MySQL has {}", (Object)gtidSet, (Object)availableGtidSet);
        return false;
    }

    private void validateAndLoadDatabaseHistory(MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover(Offsets.of(this.mySqlPartition, offset));
    }

    public static class MySqlEventMetadataProvider
    implements EventMetadataProvider {
        public static final String SERVER_ID_KEY = "server_id";
        public static final String GTID_KEY = "gtid";
        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
        public static final String THREAD_KEY = "thread";
        public static final String QUERY_KEY = "query";

        @Override
        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        @Override
        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return Collect.hashMapOf(BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY), BINLOG_POSITION_OFFSET_KEY, Long.toString(sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY)), BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(sourceInfo.getInt32(BINLOG_ROW_IN_EVENT_OFFSET_KEY)));
        }

        @Override
        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            return ((MySqlOffsetContext)offset).getTransactionId();
        }
    }

    public class MySqlTaskContextImpl
    extends MySqlTaskContext {
        private final BinaryLogClient reusedBinaryLogClient;

        public MySqlTaskContextImpl(MySqlConnectorConfig config, MySqlDatabaseSchema schema, BinaryLogClient reusedBinaryLogClient) {
            super(config, schema);
            this.reusedBinaryLogClient = this.resetBinaryLogClient(reusedBinaryLogClient);
        }

        @Override
        public BinaryLogClient getBinaryLogClient() {
            return this.reusedBinaryLogClient;
        }

        private BinaryLogClient resetBinaryLogClient(BinaryLogClient binaryLogClient) {
            Optional eventListenersField = ReflectionUtils.getField((Object)binaryLogClient, BinaryLogClient.class, (String)"eventListeners");
            eventListenersField.ifPresent(o -> ((List)o).clear());
            Optional lifecycleListeners = ReflectionUtils.getField((Object)binaryLogClient, BinaryLogClient.class, (String)"lifecycleListeners");
            lifecycleListeners.ifPresent(o -> ((List)o).clear());
            return binaryLogClient;
        }
    }
}

