/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.debezium.reader;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.RecordsFormatter;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.ExceptionUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

public class BinlogSplitReaderTest
extends MySqlSourceTestBase {
    private static final String TEST_USER = "mysqluser";
    private static final String TEST_PASSWORD = "mysqlpw";
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30L);
    private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private static final MySqlContainer MYSQL8_CONTAINER = BinlogSplitReaderTest.createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
    private final UniqueDatabase inventoryDatabase8 = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    private BinaryLogClient binaryLogClient;
    private MySqlConnection mySqlConnection;

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @After
    public void after() throws Exception {
        if (this.mySqlConnection != null) {
            this.mySqlConnection.close();
        }
        if (this.binaryLogClient != null) {
            this.binaryLogClient.disconnect();
        }
        this.customerDatabase.dropDatabase();
    }

    @Test
    public void testReadSingleBinlogSplit() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customers_even_dist"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        List<MySqlSnapshotSplit> splits = this.getMySqlSplits(new String[]{"customers_even_dist"}, sourceConfig);
        String[] expected = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplitsFromSnapshotSplits(splits, dataType, sourceConfig, 1, expected.length, splits.get(splits.size() - 1).getTableId());
        BinlogSplitReaderTest.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadAllBinlogSplitsForOneTable() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customers_even_dist"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        List<MySqlSnapshotSplit> splits = this.getMySqlSplits(new String[]{"customers_even_dist"}, sourceConfig);
        String[] expected = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "+I[105, user_5, Shanghai, 123567891234]", "+I[106, user_6, Shanghai, 123567891234]", "+I[107, user_7, Shanghai, 123567891234]", "+I[108, user_8, Shanghai, 123567891234]", "+I[109, user_9, Shanghai, 123567891234]", "+I[110, user_10, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplitsFromSnapshotSplits(splits, dataType, sourceConfig, splits.size(), expected.length, splits.get(splits.size() - 1).getTableId());
        BinlogSplitReaderTest.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadAllBinlogForTableWithSingleLine() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customer_card_single_line"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"card_no", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"level", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"note", (DataType)DataTypes.STRING())});
        List<MySqlSnapshotSplit> splits = this.getMySqlSplits(new String[]{"customer_card_single_line"}, sourceConfig);
        String[] expected = new String[]{"+I[20000, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_2, user_2, user with level 2]", "+I[20002, LEVEL_3, user_3, user with level 3]"};
        List<String> actual = this.readBinlogSplitsFromSnapshotSplits(splits, dataType, sourceConfig, splits.size(), expected.length, splits.get(splits.size() - 1).getTableId());
        BinlogSplitReaderTest.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadAllBinlogSplitsForTables() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(new String[]{"customer_card", "customer_card_single_line"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"card_no", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"level", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"note", (DataType)DataTypes.STRING())});
        List<MySqlSnapshotSplit> splits = this.getMySqlSplits(new String[]{"customer_card", "customer_card_single_line"}, sourceConfig);
        String[] expected = new String[]{"+I[20000, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_1, user_1, user with level 1]", "+I[20001, LEVEL_2, user_2, user with level 2]", "+I[20002, LEVEL_3, user_3, user with level 3]", "+I[20001, LEVEL_4, user_1, user with level 4]", "+I[20002, LEVEL_4, user_2, user with level 4]", "+I[20003, LEVEL_4, user_3, user with level 4]", "+I[20004, LEVEL_1, user_4, user with level 4]", "+I[20004, LEVEL_2, user_4, user with level 4]", "+I[20004, LEVEL_3, user_4, user with level 4]", "+I[20004, LEVEL_4, user_4, user with level 4]", "+I[30006, LEVEL_3, user_5, user with level 3]", "+I[30007, LEVEL_3, user_6, user with level 3]", "+I[30008, LEVEL_3, user_7, user with level 3]", "+I[30009, LEVEL_1, user_8, user with level 3]", "+I[30009, LEVEL_2, user_8, user with level 3]", "+I[30009, LEVEL_3, user_8, user with level 3]", "+I[40001, LEVEL_2, user_9, user with level 2]", "+I[40002, LEVEL_2, user_10, user with level 2]", "+I[40003, LEVEL_2, user_11, user with level 2]", "+I[50001, LEVEL_1, user_12, user with level 1]", "+I[50002, LEVEL_1, user_13, user with level 1]", "+I[50003, LEVEL_1, user_14, user with level 1]"};
        List<String> actual = this.readBinlogSplitsFromSnapshotSplits(splits, dataType, sourceConfig, splits.size(), expected.length, TableId.parse((String)(this.customerDatabase.getDatabaseName() + ".customer_card_single_line")));
        BinlogSplitReaderTest.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadBinlogFromLatestOffset() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.latest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        String[] expected = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
        reader.close();
    }

    @Test
    public void testReadBinlogFromEarliestOffset() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.earliest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        String[] expected = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.earliest(), new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        String tableId = this.customerDatabase.qualifiedTableName("customers");
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        this.addColumnToTable((JdbcConnection)this.mySqlConnection, tableId);
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        Throwable throwable = Assert.assertThrows(Throwable.class, () -> this.readBinlogSplits(dataType, reader, 1));
        Optional schemaOutOfSyncException = ExceptionUtils.findThrowable((Throwable)throwable, SchemaOutOfSyncException.class);
        reader.close();
        Assert.assertTrue((boolean)schemaOutOfSyncException.isPresent());
        Assert.assertEquals((Object)"Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET", (Object)((SchemaOutOfSyncException)schemaOutOfSyncException.get()).getMessage());
    }

    @Test
    public void testReadBinlogFromBinlogFilePosition() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.mySqlConnection);
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.specificOffset((String)startingOffset.getFilename(), (long)startingOffset.getPosition()), new String[]{"customers"});
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] expected = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testSkippingEvents() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.mySqlConnection);
        BinlogOffset offset = BinlogOffset.builder().setBinlogFilePosition(startingOffset.getFilename(), startingOffset.getPosition()).setSkipEvents(3L).setSkipRows(1L).build();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.specificOffset((BinlogOffset)offset), new String[]{"customers"});
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.updateCustomersTableInBulk((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"));
        String[] expected = new String[]{"-U[109, user_4, Shanghai, 123567891234]", "+U[109, user_4, Pittsburgh, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadBinlogFromGtidSet() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.mySqlConnection);
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.specificOffset((String)startingOffset.getGtidSet()), new String[]{"customers"});
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] expected = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadBinlogFromTimestamp() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        Thread.sleep(2000L);
        long startTimestamp = System.currentTimeMillis();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.timestamp((long)startTimestamp), new String[]{"customers"});
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, this.customerDatabase.qualifiedTableName("customers"), false);
        String[] expected = new String[]{"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testReadBinlogFromTimestampAfterSchemaChange() throws Exception {
        this.customerDatabase.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(new String[]{"customers"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"new_int_column", (DataType)DataTypes.INT())});
        String tableId = this.customerDatabase.qualifiedTableName("customers");
        this.addColumnToTable((JdbcConnection)this.mySqlConnection, tableId);
        Thread.sleep(2000L);
        long startTimestamp = System.currentTimeMillis();
        MySqlSourceConfig sourceConfig = this.getConfig(StartupOptions.timestamp((long)startTimestamp), new String[]{"customers"});
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        BinlogSplitReader reader = this.createBinlogReader(sourceConfig);
        reader.submitSplit((MySqlSplit)split);
        this.mySqlConnection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + tableId + " where id = 102", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234', 15213)", "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"});
        String[] expected = new String[]{"-U[103, user_3, Shanghai, 123567891234, 15213]", "+U[103, user_3, Hangzhou, 123567891234, 15213]", "-D[102, user_2, Shanghai, 123567891234, 15213]", "+I[102, user_2, Shanghai, 123567891234, 15213]", "-U[103, user_3, Hangzhou, 123567891234, 15213]", "+U[103, user_3, Shanghai, 123567891234, 15213]"};
        List<String> actual = this.readBinlogSplits(dataType, reader, expected.length);
        reader.close();
        BinlogSplitReaderTest.assertEqualsInOrder(Arrays.asList(expected), actual);
    }

    @Test
    public void testHeartbeatEvent() throws Exception {
        this.customerDatabase.createAndInitialize();
        Duration heartbeatInterval = Duration.ofMillis(500L);
        Properties dbzProps = new Properties();
        dbzProps.setProperty(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(), String.valueOf(heartbeatInterval.toMillis()));
        MySqlSourceConfig sourceConfig = this.getConfigFactory(MYSQL_CONTAINER, this.customerDatabase, new String[]{"customers"}).startupOptions(StartupOptions.latest()).heartbeatInterval(heartbeatInterval).debeziumProperties(dbzProps).createConfig(0);
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)sourceConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);
        BinlogSplitReader binlogReader = this.createBinlogReader(sourceConfig);
        MySqlBinlogSplit binlogSplit = this.createBinlogSplit(sourceConfig);
        binlogReader.submitSplit((MySqlSplit)binlogSplit);
        this.makeCustomersBinlogEvents((JdbcConnection)this.mySqlConnection, ((TableId)binlogSplit.getTableSchemas().keySet().iterator().next()).toString(), false);
        ArrayList heartbeats = new ArrayList();
        CommonTestUtils.waitUtil(() -> {
            heartbeats.addAll(this.pollRecordsFromReader(binlogReader, RecordUtils::isHeartbeatEvent));
            return !heartbeats.isEmpty();
        }, (Duration)DEFAULT_TIMEOUT, (String)"Timeout waiting for heartbeat event");
        binlogReader.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadBinlogFromUnavailableBinlog() throws Exception {
        this.inventoryDatabase8.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, new String[]{"products"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.mySqlConnection);
        MySqlSourceConfig sourceConfig = this.getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, StartupOptions.specificOffset((String)startingOffset.getGtidSet()), new String[]{"products"});
        try (Connection connection = this.inventoryDatabase8.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
            statement.execute("FLUSH LOGS;");
            Thread.sleep(3000L);
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("FLUSH LOGS;");
            Thread.sleep(3000L);
        }
        MySqlBinlogSplit split = this.createBinlogSplit(sourceConfig);
        try (BinlogSplitReader reader = this.createBinlogReader(sourceConfig, true);){
            reader.submitSplit((MySqlSplit)split);
            reader.pollSplitRecords();
        }
    }

    @Test
    public void testRestoreFromCheckpointWithTimestampStartingOffset() throws Exception {
        this.inventoryDatabase8.createAndInitialize();
        MySqlSourceConfig connectionConfig = this.getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, new String[]{"products"});
        this.binaryLogClient = DebeziumUtils.createBinaryClient((Configuration)connectionConfig.getDbzConfiguration());
        this.mySqlConnection = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)connectionConfig);
        BinlogOffset checkpointOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.mySqlConnection);
        long startTimestampMs = 15213L;
        MySqlSourceConfig sourceConfig = this.getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, StartupOptions.timestamp((long)startTimestampMs), new String[]{"products"});
        BinlogSplitReader binlogReader = this.createBinlogReader(sourceConfig);
        MySqlBinlogSplit checkpointSplit = this.createBinlogSplit(this.getConfig(MYSQL8_CONTAINER, this.inventoryDatabase8, StartupOptions.specificOffset((BinlogOffset)checkpointOffset), new String[]{"products"}));
        binlogReader.submitSplit((MySqlSplit)checkpointSplit);
        EventHeaderV4 header = new EventHeaderV4();
        header.setEventType(EventType.WRITE_ROWS);
        header.setTimestamp(1L);
        Event event = new Event((EventHeader)header, (EventData)new WriteRowsEventData());
        Predicate eventFilter = binlogReader.getBinlogSplitReadTask().getEventFilter();
        Assertions.assertThat((boolean)eventFilter.test(event)).isFalse();
    }

    private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) {
        return this.createBinlogReader(sourceConfig, false);
    }

    private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig, boolean skipValidStartingOffset) {
        return new BinlogSplitReader((StatefulTaskContext)(skipValidStartingOffset ? new TestStatefulTaskContext(sourceConfig, this.binaryLogClient, this.mySqlConnection) : new StatefulTaskContext(sourceConfig, this.binaryLogClient, this.mySqlConnection)), 0);
    }

    private MySqlBinlogSplit createBinlogSplit(MySqlSourceConfig sourceConfig) throws Exception {
        MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
        binlogSplitAssigner.open();
        try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection((MySqlSourceConfig)sourceConfig);){
            Map tableSchemas = TableDiscoveryUtils.discoverSchemaForCapturedTables((MySqlPartition)new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()), (MySqlSourceConfig)sourceConfig, (MySqlConnection)jdbc);
            MySqlBinlogSplit mySqlBinlogSplit = MySqlBinlogSplit.fillTableSchemas((MySqlBinlogSplit)((MySqlSplit)binlogSplitAssigner.getNext().get()).asBinlogSplit(), (Map)tableSchemas);
            return mySqlBinlogSplit;
        }
    }

    private List<SourceRecord> pollRecordsFromReader(BinlogSplitReader reader, Predicate<SourceRecord> filter) {
        Iterator recordIterator;
        ArrayList<SourceRecord> records = new ArrayList<SourceRecord>();
        try {
            recordIterator = reader.pollSplitRecords();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Polling action was interrupted", e);
        }
        if (recordIterator == null) {
            return records;
        }
        while (recordIterator.hasNext()) {
            for (SourceRecord record : (SourceRecords)recordIterator.next()) {
                if (!filter.test(record)) continue;
                records.add(record);
            }
        }
        LOG.debug("Records polled: {}", records);
        return records;
    }

    private List<String> readBinlogSplits(DataType dataType, BinlogSplitReader reader, int expectedSize) {
        ArrayList<String> actual = new ArrayList<String>();
        while (actual.size() < expectedSize) {
            List<String> results = this.formatResult(this.pollRecordsFromReader(reader, RecordUtils::isDataChangeRecord), dataType);
            actual.addAll(results);
        }
        return actual;
    }

    private List<String> readBinlogSplitsFromSnapshotSplits(List<MySqlSnapshotSplit> sqlSplits, DataType dataType, MySqlSourceConfig sourceConfig, int scanSplitsNum, int expectedSize, TableId binlogChangeTableId) throws Exception {
        StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, this.binaryLogClient, this.mySqlConnection);
        SnapshotSplitReader snapshotSplitReader = new SnapshotSplitReader(statefulTaskContext, 0);
        ArrayList<SourceRecord> snapshotRecords = new ArrayList<SourceRecord>();
        for (int i = 0; i < scanSplitsNum; ++i) {
            Iterator res;
            MySqlSplit sqlSplit = (MySqlSplit)sqlSplits.get(i);
            if (snapshotSplitReader.isFinished()) {
                snapshotSplitReader.submitSplit(sqlSplit);
            }
            while ((res = snapshotSplitReader.pollSplitRecords()) != null) {
                while (res.hasNext()) {
                    for (SourceRecord sourceRecord : (SourceRecords)res.next()) {
                        snapshotRecords.add(sourceRecord);
                    }
                }
            }
        }
        snapshotSplitReader.close();
        Assert.assertNotNull((Object)snapshotSplitReader.getExecutorService());
        Assert.assertTrue((boolean)snapshotSplitReader.getExecutorService().isTerminated());
        List<FinishedSnapshotSplitInfo> finishedSplitsInfo = this.getFinishedSplitsInfo(sqlSplits, snapshotRecords);
        BinlogOffset startingOffset = RecordUtils.getStartingOffsetOfBinlogSplit(finishedSplitsInfo);
        HashMap tableSchemas = new HashMap();
        for (MySqlSplit mySqlSplit : sqlSplits) {
            tableSchemas.putAll(mySqlSplit.getTableSchemas());
        }
        MySqlBinlogSplit binlogSplit = new MySqlBinlogSplit("binlog-split", startingOffset, BinlogOffset.ofNonStopping(), finishedSplitsInfo, tableSchemas, finishedSplitsInfo.size());
        BinlogSplitReader binlogSplitReader = new BinlogSplitReader(statefulTaskContext, 0);
        binlogSplitReader.submitSplit((MySqlSplit)binlogSplit);
        if (binlogChangeTableId.table().contains("customers")) {
            this.makeCustomersBinlogEvents((JdbcConnection)statefulTaskContext.getConnection(), binlogChangeTableId.toString(), scanSplitsNum == 1);
        } else {
            this.makeCustomerCardsBinlogEvents((JdbcConnection)statefulTaskContext.getConnection(), binlogChangeTableId.toString());
        }
        ArrayList<String> actual = new ArrayList<String>(this.formatResult(snapshotRecords, dataType));
        while (actual.size() < expectedSize) {
            actual.addAll(this.formatResult(this.pollRecordsFromReader(binlogSplitReader, RecordUtils::isDataChangeRecord), dataType));
        }
        binlogSplitReader.close();
        Assert.assertNotNull((Object)binlogSplitReader.getExecutorService());
        Assert.assertTrue((boolean)binlogSplitReader.getExecutorService().isTerminated());
        return actual;
    }

    private void updateCustomersTableInBulk(JdbcConnection connection, String tableId) throws Exception {
        connection.setAutoCommit(false);
        connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 101 OR id = 102", "UPDATE " + tableId + " SET address = 'Pittsburgh' where id = 103 OR id = 109"});
        connection.commit();
    }

    private void makeCustomersBinlogEvents(JdbcConnection connection, String tableId, boolean firstSplitOnly) throws SQLException {
        connection.setAutoCommit(false);
        connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + tableId + " where id = 102", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"});
        connection.commit();
        if (!firstSplitOnly) {
            connection.execute(new String[]{"UPDATE " + tableId + " SET name = 'Hangzhou' where id = 1010"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(2001, 'user_22','Shanghai','123567891234')"});
            connection.commit();
            connection.execute(new String[]{"ALTER TABLE " + tableId + " ADD COLUMN email VARCHAR(128) DEFAULT 'user@flink.apache.org'"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(2002, 'user_23','Shanghai','123567891234', 'test1@gmail.com')"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(2003, 'user_24','Shanghai','123567891234', 'test2@gmail.com')"});
            connection.commit();
        }
    }

    private void makeCustomerCardsBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
        if (tableId.endsWith("customer_card_single_line")) {
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(20000, 'LEVEL_1', 'user_1', 'user with level 1')"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(20001, 'LEVEL_2', 'user_2', 'user with level 2')", "INSERT INTO " + tableId + " VALUES(20002, 'LEVEL_3', 'user_3', 'user with level 3')"});
            connection.commit();
        } else {
            connection.execute(new String[]{"UPDATE " + tableId + " SET level = 'LEVEL_3' where user_id = 'user_1'", "INSERT INTO " + tableId + " VALUES(20002, 'LEVEL_5', 'user_15', 'user with level 15'"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(40000, 'LEVEL_1', 'user_16', 'user with level 1')", "INSERT INTO " + tableId + " VALUES(40004, 'LEVEL_2', 'user_17', 'user with level 2')"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(50004, 'LEVEL_1', 'user_18', 'user with level 1')", "INSERT INTO " + tableId + " VALUES(50005, 'LEVEL_2', 'user_19', 'user with level 2')"});
            connection.commit();
        }
    }

    private List<FinishedSnapshotSplitInfo> getFinishedSplitsInfo(List<MySqlSnapshotSplit> mySqlSplits, List<SourceRecord> records) {
        HashMap splitMap = new HashMap();
        mySqlSplits.forEach(r -> splitMap.put(r.splitId(), r));
        ArrayList<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<FinishedSnapshotSplitInfo>();
        records.stream().filter(RecordUtils::isHighWatermarkEvent).forEach(event -> {
            Struct value = (Struct)event.value();
            String splitId = value.getString("split_id");
            MySqlSnapshotSplit mySqlSplit = (MySqlSnapshotSplit)splitMap.get(splitId);
            finishedSplitsInfo.add(RecordUtils.getSnapshotSplitInfo((MySqlSnapshotSplit)mySqlSplit, (SourceRecord)event));
        });
        return finishedSplitsInfo;
    }

    private List<String> formatResult(List<SourceRecord> records, DataType dataType) {
        RecordsFormatter formatter = new RecordsFormatter(dataType);
        return formatter.format(records);
    }

    private List<MySqlSnapshotSplit> getMySqlSplits(String[] captureTables, MySqlSourceConfig sourceConfig) {
        Optional mySqlSplit;
        List captureTableIds = Arrays.stream(captureTables).map(tableName -> this.customerDatabase.getDatabaseName() + "." + tableName).collect(Collectors.toList());
        List remainingTables = captureTableIds.stream().map(TableId::parse).collect(Collectors.toList());
        MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(sourceConfig, 4, remainingTables, false);
        assigner.open();
        ArrayList<MySqlSnapshotSplit> mySqlSplits = new ArrayList<MySqlSnapshotSplit>();
        while ((mySqlSplit = assigner.getNext()).isPresent()) {
            mySqlSplits.add(((MySqlSplit)mySqlSplit.get()).asSnapshotSplit());
        }
        assigner.close();
        return mySqlSplits;
    }

    private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) {
        return this.getConfig(MYSQL_CONTAINER, this.customerDatabase, startupOptions, captureTables);
    }

    private MySqlSourceConfig getConfig(MySqlContainer container, UniqueDatabase database, StartupOptions startupOptions, String[] captureTables) {
        return this.getConfigFactory(container, database, captureTables).startupOptions(startupOptions).createConfig(0);
    }

    private MySqlSourceConfig getConfig(String[] captureTables) {
        return this.getConfig(MYSQL_CONTAINER, this.customerDatabase, captureTables);
    }

    private MySqlSourceConfig getConfig(MySqlContainer container, UniqueDatabase database, String[] captureTables) {
        return this.getConfigFactory(container, database, captureTables).createConfig(0);
    }

    private MySqlSourceConfigFactory getConfigFactory(MySqlContainer container, UniqueDatabase database, String[] captureTables) {
        String[] captureTableIds = (String[])Arrays.stream(captureTables).map(tableName -> database.getDatabaseName() + "." + tableName).toArray(String[]::new);
        return new MySqlSourceConfigFactory().databaseList(new String[]{database.getDatabaseName()}).tableList(captureTableIds).hostname(container.getHost()).port(container.getDatabasePort()).username(database.getUsername()).splitSize(4).fetchSize(2).password(database.getPassword());
    }

    private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception {
        connection.execute(new String[]{"ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213"});
        connection.commit();
    }

    private static class TestStatefulTaskContext
    extends StatefulTaskContext {
        public TestStatefulTaskContext(MySqlSourceConfig sourceConfig, BinaryLogClient binaryLogClient, MySqlConnection connection) {
            super(sourceConfig, binaryLogClient, connection);
        }

        protected MySqlOffsetContext loadStartingOffsetState(OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
            BinlogOffset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.ofEarliest() : BinlogOffsetUtils.initializeEffectiveOffset((BinlogOffset)mySqlSplit.asBinlogSplit().getStartingOffset(), (MySqlConnection)this.getConnection());
            LOG.info("Starting offset is initialized to {}", (Object)offset);
            MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext)loader.load(offset.getOffset());
            return mySqlOffsetContext;
        }
    }
}

