/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.reader;

import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.document.Array;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.RecordsFormatter;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSourceReaderTest
extends MySqlSourceTestBase {
    private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");

    @After
    public void clear() {
        this.customerDatabase.dropDatabase();
        this.inventoryDatabase.dropDatabase();
    }

    @Test
    public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception {
        List<MySqlSplit> snapshotSplits;
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customers"});
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);){
            Map tableSchemas = TableDiscoveryUtils.discoverSchemaForCapturedTables((MySqlPartition)new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()), (MySqlSourceConfig)sourceConfig, (MySqlConnection)jdbc);
            TableId tableId = new TableId(this.customerDatabase.getDatabaseName(), null, "customers");
            RowType splitType = RowType.of((LogicalType[])new LogicalType[]{DataTypes.INT().getLogicalType()}, (String[])new String[]{"id"});
            snapshotSplits = Arrays.asList(new MySqlSnapshotSplit(tableId, tableId + ":0", splitType, null, (Object[])new Integer[]{200}, null, tableSchemas), new MySqlSnapshotSplit(tableId, tableId + ":1", splitType, (Object[])new Integer[]{200}, (Object[])new Integer[]{1500}, null, tableSchemas), new MySqlSnapshotSplit(tableId, tableId + ":2", splitType, (Object[])new Integer[]{1500}, null, null, tableSchemas));
        }
        MySqlSourceReader<SourceRecord> reader = this.createReader(sourceConfig, -1);
        reader.start();
        reader.addSplits(snapshotSplits);
        String[] expectedRecords = new String[]{"+I[111, user_6, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[101, user_1, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        Thread.sleep(5000L);
        List<String> actualRecords = this.consumeRecords(reader, dataType);
        MySqlSourceReaderTest.assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
        List splitsState = reader.snapshotState(1L);
        MySqlSourceReader<SourceRecord> restartReader = this.createReader(sourceConfig, -1);
        restartReader.start();
        restartReader.addSplits(splitsState);
        Assert.assertEquals((long)3L, (long)reader.getFinishedUnackedSplits().size());
        MySqlSourceReaderTest.assertMapEquals(restartReader.getFinishedUnackedSplits(), reader.getFinishedUnackedSplits());
        reader.close();
        restartReader.close();
    }

    @Test
    public void testBinlogReadFailoverCrossTransaction() throws Exception {
        MySqlBinlogSplit binlogSplit;
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customers"});
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);){
            Map tableSchemas = TableDiscoveryUtils.discoverSchemaForCapturedTables((MySqlPartition)new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()), (MySqlSourceConfig)sourceConfig, (MySqlConnection)jdbc);
            binlogSplit = MySqlBinlogSplit.fillTableSchemas((MySqlBinlogSplit)this.createBinlogSplit(sourceConfig).asBinlogSplit(), (Map)tableSchemas);
        }
        MySqlSourceReader<SourceRecord> reader = this.createReader(sourceConfig, 1);
        reader.start();
        reader.addSplits(Collections.singletonList(binlogSplit));
        TableId tableId = (TableId)binlogSplit.getTableSchemas().keySet().iterator().next();
        this.makeBinlogEventsInOneTransaction(sourceConfig, tableId.toString());
        String[] expectedRecords = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]"};
        List<String> actualRecords = this.consumeRecords(reader, dataType);
        MySqlSourceReaderTest.assertEqualsInOrder(Arrays.asList(expectedRecords), actualRecords);
        List splitsState = reader.snapshotState(1L);
        Assert.assertEquals((long)1L, (long)splitsState.size());
        reader.close();
        MySqlSourceReader<SourceRecord> restartReader = this.createReader(sourceConfig, 3);
        restartReader.start();
        restartReader.addSplits(splitsState);
        String[] expectedRestRecords = new String[]{"-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]"};
        List<String> restRecords = this.consumeRecords(restartReader, dataType);
        MySqlSourceReaderTest.assertEqualsInOrder(Arrays.asList(expectedRestRecords), restRecords);
        restartReader.close();
    }

    @Test
    public void testRemoveSplitAccordingToNewFilter() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        List<String> tableNames = Arrays.asList(this.inventoryDatabase.getDatabaseName() + ".products", this.inventoryDatabase.getDatabaseName() + ".customers");
        MySqlSourceConfig sourceConfig = new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.getTableNameRegex(tableNames.toArray(new String[0]))}).includeSchemaChanges(false).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(this.customerDatabase.getUsername()).password(this.customerDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).createConfig(0);
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(sourceConfig, 4, tableNames.stream().map(TableId::parse).collect(Collectors.toList()), false);
        assigner.open();
        ArrayList<MySqlSnapshotSplit> splits = new ArrayList<MySqlSnapshotSplit>();
        MySqlSnapshotSplit split = (MySqlSnapshotSplit)assigner.getNext().get();
        splits.add(split);
        split = (MySqlSnapshotSplit)assigner.getNext().get();
        Assert.assertFalse((boolean)assigner.getNext().isPresent());
        splits.add(split);
        MySqlSourceConfig sourceConfig4Reader = new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.getTableNameRegex(tableNames.subList(0, 1).toArray(new String[0]))}).includeSchemaChanges(false).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(this.customerDatabase.getUsername()).password(this.customerDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).createConfig(0);
        MySqlSourceReader<SourceRecord> reader = this.createReader(sourceConfig4Reader, 1);
        reader.start();
        reader.addSplits(splits);
        List mySqlSplits = reader.snapshotState(1L);
        Assert.assertEquals((long)1L, (long)mySqlSplits.size());
        reader.close();
    }

    @Test
    public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception {
        InputStatus status;
        this.inventoryDatabase.createAndInitialize();
        String tableName = this.inventoryDatabase.getDatabaseName() + ".products";
        MySqlSourceConfig sourceConfig = new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{tableName}).includeSchemaChanges(false).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(this.customerDatabase.getUsername()).password(this.customerDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).createConfig(0);
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(sourceConfig, 4, Collections.singletonList(TableId.parse((String)tableName)), false);
        assigner.open();
        MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit)assigner.getNext().get();
        Assert.assertFalse((boolean)assigner.getNext().isPresent());
        Assert.assertNull((Object)snapshotSplit.getSplitStart());
        Assert.assertNull((Object)snapshotSplit.getSplitEnd());
        assigner.close();
        AtomicBoolean finishReading = new AtomicBoolean(false);
        CountDownLatch updatingExecuted = new CountDownLatch(1);
        TestingReaderContext testingReaderContext = new TestingReaderContext();
        MySqlSourceReader<SourceRecord> reader = this.createReader(sourceConfig, (SourceReaderContext)testingReaderContext, 0, SnapshotPhaseHooks.empty());
        reader.start();
        Thread updateWorker = new Thread(() -> {
            try (Connection connection = this.inventoryDatabase.getJdbcConnection();
                 Statement statement = connection.createStatement();){
                boolean flagSet = false;
                while (!finishReading.get()) {
                    statement.execute("UPDATE products SET  description='" + UUID.randomUUID().toString() + "' WHERE id=101");
                    if (flagSet) continue;
                    updatingExecuted.countDown();
                    flagSet = true;
                }
            }
            catch (Exception throwables) {
                throwables.printStackTrace();
            }
        });
        updateWorker.start();
        updatingExecuted.await();
        reader.addSplits(Collections.singletonList(snapshotSplit));
        reader.notifyNoMoreSplits();
        TestingReaderOutput output = new TestingReaderOutput();
        while ((status = reader.pollNext((ReaderOutput)output)) != InputStatus.END_OF_INPUT) {
            if (status != InputStatus.NOTHING_AVAILABLE) continue;
            reader.isAvailable().get();
        }
        finishReading.set(true);
        updateWorker.join();
        ArrayList emittedRecords = output.getEmittedRecords();
        HashMap<Object, SourceRecord> recordByKey = new HashMap<Object, SourceRecord>();
        for (SourceRecord record : emittedRecords) {
            SourceRecord existed = (SourceRecord)recordByKey.get(record.key());
            if (existed != null) {
                Assert.fail((String)String.format("The emitted record contains duplicate records on key\n%s\n%s\n", existed, record));
                continue;
            }
            recordByKey.put(record.key(), record);
        }
        reader.close();
    }

    private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration, int limit) throws Exception {
        return this.createReader(configuration, (SourceReaderContext)new TestingReaderContext(), limit, SnapshotPhaseHooks.empty());
    }

    private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration, SourceReaderContext readerContext, int limit, SnapshotPhaseHooks snapshotHooks) throws Exception {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup", new Class[0]);
        metricGroupMethod.setAccessible(true);
        MetricGroup metricGroup = (MetricGroup)metricGroupMethod.invoke((Object)readerContext, new Object[0]);
        MysqlLimitedRecordEmitter recordEmitter = limit > 0 ? new MysqlLimitedRecordEmitter(new ForwardDeserializeSchema(), new MySqlSourceReaderMetrics(metricGroup), configuration.isIncludeSchemaChanges(), limit) : new MySqlRecordEmitter((DebeziumDeserializationSchema)new ForwardDeserializeSchema(), new MySqlSourceReaderMetrics(metricGroup), configuration.isIncludeSchemaChanges());
        MySqlSourceReaderContext mySqlSourceReaderContext = new MySqlSourceReaderContext(readerContext);
        return new MySqlSourceReader(elementsQueue, () -> this.createSplitReader(configuration, mySqlSourceReaderContext, snapshotHooks), (RecordEmitter)recordEmitter, readerContext.getConfiguration(), mySqlSourceReaderContext, configuration);
    }

    private MySqlSplitReader createSplitReader(MySqlSourceConfig configuration, MySqlSourceReaderContext readerContext, SnapshotPhaseHooks snapshotHooks) {
        return new MySqlSplitReader(configuration, 0, readerContext, snapshotHooks);
    }

    private void makeBinlogEventsInOneTransaction(MySqlSourceConfig sourceConfig, String tableId) throws SQLException {
        JdbcConnection connection = DebeziumUtils.openJdbcConnection((MySqlSourceConfig)sourceConfig);
        connection.setAutoCommit(false);
        connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + tableId + " where id = 102", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"});
        connection.commit();
        connection.close();
    }

    private MySqlSplit createBinlogSplit(MySqlSourceConfig sourceConfig) {
        MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
        binlogSplitAssigner.open();
        return (MySqlSplit)binlogSplitAssigner.getNext().get();
    }

    private MySqlSourceConfig getConfig(String[] captureTables) {
        String[] captureTableIds = (String[])Arrays.stream(captureTables).map(tableName -> this.customerDatabase.getDatabaseName() + "." + tableName).toArray(String[]::new);
        return new MySqlSourceConfigFactory().startupOptions(StartupOptions.latest()).databaseList(new String[]{this.customerDatabase.getDatabaseName()}).tableList(captureTableIds).includeSchemaChanges(false).hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).splitSize(10).fetchSize(2).username(this.customerDatabase.getUsername()).password(this.customerDatabase.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).createConfig(0);
    }

    private List<String> consumeRecords(MySqlSourceReader<SourceRecord> sourceReader, DataType recordType) throws Exception {
        SimpleReaderOutput output = new SimpleReaderOutput();
        InputStatus status = InputStatus.MORE_AVAILABLE;
        while (InputStatus.MORE_AVAILABLE == status || output.getResults().size() == 0) {
            status = sourceReader.pollNext((ReaderOutput)output);
        }
        RecordsFormatter formatter = new RecordsFormatter(recordType);
        return formatter.format(output.getResults());
    }

    private String getTableNameRegex(String[] captureCustomerTables) {
        Preconditions.checkState((captureCustomerTables.length > 0 ? 1 : 0) != 0);
        if (captureCustomerTables.length == 1) {
            return captureCustomerTables[0];
        }
        return String.format("(%s)", StringUtils.join((Object[])captureCustomerTables, (String)"|"));
    }

    private static class MysqlLimitedRecordEmitter
    implements RecordEmitter<SourceRecords, SourceRecord, MySqlSplitState> {
        private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordEmitter.class);
        private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer();
        private final DebeziumDeserializationSchema<SourceRecord> debeziumDeserializationSchema;
        private final MySqlSourceReaderMetrics sourceReaderMetrics;
        private final boolean includeSchemaChanges;
        private final OutputCollector<SourceRecord> outputCollector;
        private final int limit;

        public MysqlLimitedRecordEmitter(DebeziumDeserializationSchema<SourceRecord> debeziumDeserializationSchema, MySqlSourceReaderMetrics sourceReaderMetrics, boolean includeSchemaChanges, int limit) {
            this.debeziumDeserializationSchema = debeziumDeserializationSchema;
            this.sourceReaderMetrics = sourceReaderMetrics;
            this.includeSchemaChanges = includeSchemaChanges;
            this.outputCollector = new OutputCollector();
            Preconditions.checkState((limit > 0 ? 1 : 0) != 0);
            this.limit = limit;
        }

        public void emitRecord(SourceRecords sourceRecords, SourceOutput<SourceRecord> output, MySqlSplitState splitState) throws Exception {
            Iterator elementIterator = sourceRecords.iterator();
            int sendCnt = 0;
            while (elementIterator.hasNext()) {
                if (sendCnt >= this.limit) {
                    return;
                }
                this.processElement((SourceRecord)elementIterator.next(), output, splitState);
                ++sendCnt;
            }
        }

        private void processElement(SourceRecord element, SourceOutput<SourceRecord> output, MySqlSplitState splitState) throws Exception {
            if (RecordUtils.isWatermarkEvent((SourceRecord)element)) {
                BinlogOffset watermark = RecordUtils.getWatermark((SourceRecord)element);
                if (RecordUtils.isHighWatermarkEvent((SourceRecord)element) && splitState.isSnapshotSplitState()) {
                    splitState.asSnapshotSplitState().setHighWatermark(watermark);
                }
            } else if (RecordUtils.isSchemaChangeEvent((SourceRecord)element) && splitState.isBinlogSplitState()) {
                HistoryRecord historyRecord = RecordUtils.getHistoryRecord((SourceRecord)element);
                Array tableChanges = historyRecord.document().getArray((CharSequence)"tableChanges");
                TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
                for (TableChanges.TableChange tableChange : changes) {
                    splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
                }
                if (this.includeSchemaChanges) {
                    BinlogOffset position = RecordUtils.getBinlogPosition((SourceRecord)element);
                    splitState.asBinlogSplitState().setStartingOffset(position);
                    this.emitElement(element, output);
                }
            } else if (RecordUtils.isDataChangeRecord((SourceRecord)element)) {
                this.updateStartingOffsetForSplit(splitState, element);
                this.reportMetrics(element);
                this.emitElement(element, output);
            } else if (RecordUtils.isHeartbeatEvent((SourceRecord)element)) {
                this.updateStartingOffsetForSplit(splitState, element);
            } else {
                LOG.info("Meet unknown element {}, just skip.", (Object)element);
            }
        }

        private void updateStartingOffsetForSplit(MySqlSplitState splitState, SourceRecord element) {
            if (splitState.isBinlogSplitState()) {
                BinlogOffset position = RecordUtils.getBinlogPosition((SourceRecord)element);
                splitState.asBinlogSplitState().setStartingOffset(position);
            }
        }

        private void emitElement(SourceRecord element, SourceOutput<SourceRecord> output) throws Exception {
            ((OutputCollector)this.outputCollector).output = output;
            this.debeziumDeserializationSchema.deserialize(element, this.outputCollector);
        }

        private void reportMetrics(SourceRecord element) {
            Long fetchTimestamp;
            Long messageTimestamp = RecordUtils.getMessageTimestamp((SourceRecord)element);
            if (messageTimestamp != null && messageTimestamp > 0L && (fetchTimestamp = RecordUtils.getFetchTimestamp((SourceRecord)element)) != null && fetchTimestamp >= messageTimestamp) {
                this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
            }
        }

        private static class OutputCollector<T>
        implements Collector<T> {
            private SourceOutput<T> output;

            private OutputCollector() {
            }

            public void collect(T record) {
                this.output.collect(record);
            }

            public void close() {
            }
        }
    }

    private static class ForwardDeserializeSchema
    implements DebeziumDeserializationSchema<SourceRecord> {
        private static final long serialVersionUID = 1L;

        private ForwardDeserializeSchema() {
        }

        public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
            out.collect((Object)record);
        }

        public TypeInformation<SourceRecord> getProducedType() {
            return TypeInformation.of(SourceRecord.class);
        }
    }

    private static class SimpleReaderOutput
    implements ReaderOutput<SourceRecord> {
        private final List<SourceRecord> results = new ArrayList<SourceRecord>();

        private SimpleReaderOutput() {
        }

        public void collect(SourceRecord record) {
            this.results.add(record);
        }

        public List<SourceRecord> getResults() {
            return this.results;
        }

        public void collect(SourceRecord record, long timestamp) {
            this.collect(record);
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
            throw new UnsupportedOperationException();
        }

        public SourceOutput<SourceRecord> createOutputForSplit(String splitId) {
            return this;
        }

        public void releaseOutputForSplit(String splitId) {
        }
    }
}

