/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.debezium.task;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.io.UnsupportedEncodingException;
import java.sql.Blob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Calendar;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSnapshotSplitReadTask
extends AbstractSnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlSnapshotSplitReadTask.class);
    private static final Duration LOG_INTERVAL = Duration.ofMillis(10000L);
    private final MySqlSourceConfig sourceConfig;
    private final MySqlDatabaseSchema databaseSchema;
    private final MySqlConnection jdbcConnection;
    private final EventDispatcherImpl<TableId> dispatcher;
    private final Clock clock;
    private final MySqlSnapshotSplit snapshotSplit;
    private final TopicSelector<TableId> topicSelector;
    private final EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver;
    private final SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
    private final SnapshotPhaseHooks hooks;
    private final boolean isBackfillSkipped;

    public MySqlSnapshotSplitReadTask(MySqlSourceConfig sourceConfig, MySqlConnectorConfig connectorConfig, SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics, MySqlDatabaseSchema databaseSchema, MySqlConnection jdbcConnection, EventDispatcherImpl<TableId> dispatcher, TopicSelector<TableId> topicSelector, EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver, Clock clock, MySqlSnapshotSplit snapshotSplit, SnapshotPhaseHooks hooks, boolean isBackfillSkipped) {
        super((CommonConnectorConfig)connectorConfig, snapshotChangeEventSourceMetrics);
        this.sourceConfig = sourceConfig;
        this.databaseSchema = databaseSchema;
        this.jdbcConnection = jdbcConnection;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.snapshotSplit = snapshotSplit;
        this.topicSelector = topicSelector;
        this.snapshotReceiver = snapshotReceiver;
        this.snapshotChangeEventSourceMetrics = snapshotChangeEventSourceMetrics;
        this.hooks = hooks;
        this.isBackfillSkipped = isBackfillSkipped;
    }

    public SnapshotResult<MySqlOffsetContext> execute(ChangeEventSource.ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext previousOffset) throws InterruptedException {
        MySqlSnapshotContext ctx;
        AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask = this.getSnapshottingTask(partition, previousOffset);
        try {
            ctx = this.prepare(partition);
        }
        catch (Exception e) {
            LOG.error("Failed to initialize snapshot context.", (Throwable)e);
            throw new RuntimeException(e);
        }
        try {
            return this.doExecute(context, previousOffset, (AbstractSnapshotChangeEventSource.SnapshotContext<MySqlPartition, MySqlOffsetContext>)ctx, snapshottingTask);
        }
        catch (InterruptedException e) {
            LOG.warn("Snapshot was interrupted before completion");
            throw e;
        }
        catch (Exception t) {
            throw new DebeziumException((Throwable)t);
        }
    }

    protected SnapshotResult<MySqlOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext context, MySqlOffsetContext previousOffset, AbstractSnapshotChangeEventSource.SnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        MySqlSnapshotContext ctx = (MySqlSnapshotContext)snapshotContext;
        ctx.offset = previousOffset;
        SignalEventDispatcher signalEventDispatcher = new SignalEventDispatcher(previousOffset.getOffset(), this.topicSelector.topicNameFor((DataCollectionId)this.snapshotSplit.getTableId()), this.dispatcher.getQueue());
        if (this.hooks.getPreLowWatermarkAction() != null) {
            this.hooks.getPreLowWatermarkAction().accept((Object)this.jdbcConnection, this.snapshotSplit);
        }
        BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(this.jdbcConnection);
        LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl)context).setLowWatermark(lowWatermark);
        signalEventDispatcher.dispatchWatermarkEvent(this.snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
        if (this.hooks.getPostLowWatermarkAction() != null) {
            this.hooks.getPostLowWatermarkAction().accept((Object)this.jdbcConnection, this.snapshotSplit);
        }
        LOG.info("Snapshot step 2 - Snapshotting data");
        this.createDataEvents(ctx, this.snapshotSplit.getTableId());
        if (this.hooks.getPreHighWatermarkAction() != null) {
            this.hooks.getPreHighWatermarkAction().accept((Object)this.jdbcConnection, this.snapshotSplit);
        }
        BinlogOffset highWatermark = this.isBackfillSkipped ? lowWatermark : DebeziumUtils.currentBinlogOffset(this.jdbcConnection);
        LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
        signalEventDispatcher.dispatchWatermarkEvent(this.snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);
        ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl)context).setHighWatermark(highWatermark);
        if (this.hooks.getPostHighWatermarkAction() != null) {
            this.hooks.getPostHighWatermarkAction().accept((Object)this.jdbcConnection, this.snapshotSplit);
        }
        return SnapshotResult.completed((OffsetContext)ctx.offset);
    }

    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) {
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(false, true);
    }

    protected MySqlSnapshotContext prepare(MySqlPartition partition) throws Exception {
        return new MySqlSnapshotContext(partition);
    }

    private void createDataEvents(MySqlSnapshotContext snapshotContext, TableId tableId) throws Exception {
        LOG.debug("Snapshotting table {}", (Object)tableId);
        this.createDataEventsForTable(snapshotContext, this.snapshotReceiver, this.databaseSchema.tableFor(tableId));
        this.snapshotReceiver.completeSnapshot();
    }

    private void createDataEventsForTable(MySqlSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver, Table table) throws InterruptedException {
        long exportStart = this.clock.currentTimeInMillis();
        LOG.info("Exporting data from split '{}' of table {}", (Object)this.snapshotSplit.splitId(), (Object)table.id());
        String selectSql = StatementUtils.buildSplitScanQuery(this.snapshotSplit.getTableId(), this.snapshotSplit.getSplitKeyType(), this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null);
        LOG.info("For split '{}' of table {} using select statement: '{}'", new Object[]{this.snapshotSplit.splitId(), table.id(), selectSql});
        try (PreparedStatement selectStatement = StatementUtils.readTableSplitDataStatement(this.jdbcConnection, selectSql, this.snapshotSplit.getSplitStart() == null, this.snapshotSplit.getSplitEnd() == null, this.snapshotSplit.getSplitStart(), this.snapshotSplit.getSplitEnd(), this.snapshotSplit.getSplitKeyType().getFieldCount(), this.sourceConfig.getFetchSize());
             ResultSet rs = selectStatement.executeQuery();){
            ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray((ResultSet)rs, (Table)table);
            long rows = 0L;
            Threads.Timer logTimer = this.getTableScanLogTimer();
            while (rs.next()) {
                ++rows;
                Object[] row = new Object[columnArray.getGreatestColumnPosition()];
                for (int i = 0; i < columnArray.getColumns().length; ++i) {
                    Column actualColumn = (Column)table.columns().get(i);
                    row[columnArray.getColumns()[i].position() - 1] = this.readField(rs, i + 1, actualColumn, table);
                }
                if (logTimer.expired()) {
                    long stop = this.clock.currentTimeInMillis();
                    LOG.info("Exported {} records for split '{}' after {}", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration((long)(stop - exportStart))});
                    this.snapshotChangeEventSourceMetrics.rowsScanned(snapshotContext.partition, table.id(), rows);
                    logTimer = this.getTableScanLogTimer();
                }
                this.dispatcher.dispatchSnapshotEvent((Partition)((MySqlPartition)snapshotContext.partition), (DataCollectionId)table.id(), this.getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver);
            }
            LOG.info("Finished exporting {} records for split '{}', total duration '{}'", new Object[]{rows, this.snapshotSplit.splitId(), Strings.duration((long)(this.clock.currentTimeInMillis() - exportStart))});
        }
        catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", (Throwable)e);
        }
    }

    protected ChangeRecordEmitter<MySqlPartition> getChangeRecordEmitter(MySqlSnapshotContext snapshotContext, TableId tableId, Object[] row) {
        ((MySqlOffsetContext)snapshotContext.offset).event((DataCollectionId)tableId, this.clock.currentTime());
        return new SnapshotChangeRecordEmitter(snapshotContext.partition, snapshotContext.offset, row, this.clock);
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer((Clock)this.clock, (Duration)LOG_INTERVAL);
    }

    private Object readField(ResultSet rs, int fieldNo, Column actualColumn, Table actualTable) throws SQLException {
        if (actualColumn.jdbcType() == 92) {
            return this.readTimeField(rs, fieldNo);
        }
        if (actualColumn.jdbcType() == 91) {
            return this.readDateField(rs, fieldNo, actualColumn, actualTable);
        }
        if (actualColumn.jdbcType() == 93) {
            return this.readTimestampField(rs, fieldNo, actualColumn, actualTable);
        }
        if (actualColumn.jdbcType() == -6 || actualColumn.jdbcType() == 5) {
            return rs.getObject(fieldNo) == null ? null : Integer.valueOf(rs.getInt(fieldNo));
        }
        return rs.getObject(fieldNo);
    }

    private Object readTimeField(ResultSet rs, int fieldNo) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.stringToDuration(new String(b.getBytes(1L, (int)b.length()), "UTF-8"));
        }
        catch (UnsupportedEncodingException e) {
            LOG.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private Object readDateField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.stringToLocalDate(new String(b.getBytes(1L, (int)b.length()), "UTF-8"), column, table);
        }
        catch (UnsupportedEncodingException e) {
            LOG.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private Object readTimestampField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.containsZeroValuesInDatePart(new String(b.getBytes(1L, (int)b.length()), "UTF-8"), column, table) ? null : rs.getTimestamp(fieldNo, Calendar.getInstance());
        }
        catch (UnsupportedEncodingException e) {
            LOG.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private static class MySqlSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> {
        public MySqlSnapshotContext(MySqlPartition partition) throws SQLException {
            super((Partition)partition, "");
        }
    }
}

