/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.table;

import io.debezium.jdbc.JdbcConnection;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.connectors.mysql.LegacyMySqlSourceTest;
import org.apache.flink.cdc.connectors.mysql.MySqlTestUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

@RunWith(value=Parameterized.class)
public class MySqlConnectorITCase
extends MySqlSourceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlConnectorITCase.class);
    private static final String TEST_USER = "mysqluser";
    private static final String TEST_PASSWORD = "mysqlpw";
    private static final MySqlContainer MYSQL8_CONTAINER = MySqlConnectorITCase.createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    private final UniqueDatabase fullTypesMySql57Database = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw");
    private final UniqueDatabase fullTypesMySql8Database = new UniqueDatabase(MYSQL8_CONTAINER, "column_type_test_mysql8", "mysqluser", "mysqlpw");
    private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private static final UniqueDatabase customer3_0Database = new UniqueDatabase(MYSQL_CONTAINER, "customer3.0", "mysqluser", "mysqlpw");
    private final UniqueDatabase userDatabase1 = new UniqueDatabase(MYSQL_CONTAINER, "user_1", "mysqluser", "mysqlpw");
    private final UniqueDatabase userDatabase2 = new UniqueDatabase(MYSQL_CONTAINER, "user_2", "mysqluser", "mysqlpw");
    private final UniqueDatabase inventoryDatabase8 = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    private final UniqueDatabase binlogDatabase = new UniqueDatabase(MYSQL8_CONTAINER, "binlog_metadata_test", "mysqluser", "mysqlpw");
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    private final boolean incrementalSnapshot;

    public MySqlConnectorITCase(boolean incrementalSnapshot) {
        this.incrementalSnapshot = incrementalSnapshot;
    }

    @Parameterized.Parameters(name="incrementalSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[][]{{false}, {true}};
    }

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        if (this.incrementalSnapshot) {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        } else {
            this.env.setParallelism(1);
        }
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        this.runConsumingAllEventsTest("");
    }

    @Test
    public void testConsumingAllEventsUseSSL() throws Exception {
        this.runConsumingAllEventsTest(", 'jdbc.properties.useSSL'= 'true', 'jdbc.properties.requireSSL'= 'true', 'jdbc.properties.verifyServerCerticate'= 'false'");
    }

    private void runConsumingAllEventsTest(String otherTableOptions) throws Exception {
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( `id` INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s' %s)", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", this.incrementalSnapshot, this.getServerId(), this.getSplitSize(), otherTableOptions);
        String sinkDDL = "CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
        MySqlConnectorITCase.waitForSnapshotStarted("sink");
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
            statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 20);
        String[] expected = new String[]{"+I[scooter, 3.140]", "+I[car battery, 8.100]", "+I[12-pack drill bits, 0.800]", "+I[hammer, 2.625]", "+I[rocks, 5.100]", "+I[jacket, 0.600]", "+I[spare tire, 22.200]"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testNoPKTableWithChunkKey() throws Exception {
        this.runConsumingForNoPKTableTest(", 'scan.incremental.snapshot.chunk.key-column'='type'");
    }

    @Test
    public void testNoPKTableWithoutChunkKey() {
        Throwable throwable = Assert.assertThrows(Throwable.class, () -> this.runConsumingForNoPKTableTest(""));
        Optional validationException = ExceptionUtils.findThrowable((Throwable)throwable, ValidationException.class);
        Assert.assertTrue((boolean)validationException.isPresent());
        Assert.assertEquals((Object)"'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.", (Object)((ValidationException)validationException.get()).getCause().getMessage());
    }

    private void runConsumingForNoPKTableTest(String otherTableOptions) throws Exception {
        String[] stringArray;
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( `type` INT, name STRING, description STRING, weight DECIMAL(10,3)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = 'products_no_pk', 'scan.incremental.snapshot.enabled' = 'true', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '2' %s)", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), this.getServerId(), otherTableOptions);
        String sinkDDL = "CREATE TABLE sink ( `type` INT, name STRING, description STRING, weight DECIMAL(10,3)" + (this.incrementalSnapshot ? ", PRIMARY KEY (`type`) NOT ENFORCED" : "") + ") WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        MySqlConnectorITCase.waitForSinkSize("sink", 11);
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE products_no_pk SET description='18oz carpenter hammer' WHERE type=103;");
            statement.execute("UPDATE products_no_pk SET weight='5.1' WHERE type=106;");
            statement.execute("INSERT INTO products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("DELETE FROM products_no_pk WHERE type=102;");
            statement.execute("INSERT INTO products_no_pk VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE type=110;");
            statement.execute("UPDATE products_no_pk SET weight='5.17' WHERE type=111;");
            statement.execute("DELETE FROM products_no_pk WHERE type=111;");
        }
        MySqlConnectorITCase.waitForSinkSize("sink", this.incrementalSnapshot ? 25 : 29);
        if (this.incrementalSnapshot) {
            String[] stringArray2 = new String[7];
            stringArray2[0] = "+I[100, scooter, Small 2-wheel scooter, 3.140]";
            stringArray2[1] = "+I[101, car battery, 12V car battery, 8.100]";
            stringArray2[2] = "+I[103, hammer, 18oz carpenter hammer, 1.000]";
            stringArray2[3] = "+I[104, rocks, box of assorted rocks, 5.300]";
            stringArray2[4] = "+I[105, jacket, water resistent black wind breaker, 0.100]";
            stringArray2[5] = "+I[106, spare tire, 24 inch spare tire, 5.100]";
            stringArray = stringArray2;
            stringArray2[6] = "+I[110, jacket, new water resistent white wind breaker, 0.500]";
        } else {
            String[] stringArray3 = new String[11];
            stringArray3[0] = "+I[100, scooter, Small 2-wheel scooter, 3.140]";
            stringArray3[1] = "+I[101, car battery, 12V car battery, 8.100]";
            stringArray3[2] = "+I[103, hammer, 18oz carpenter hammer, 0.750]";
            stringArray3[3] = "+I[103, hammer, 18oz carpenter hammer, 0.875]";
            stringArray3[4] = "+I[103, hammer, 18oz carpenter hammer, 1.000]";
            stringArray3[5] = "+I[104, rocks, box of assorted rocks, 5.300]";
            stringArray3[6] = "+I[104, rocks, box of assorted rocks, 5.300]";
            stringArray3[7] = "+I[104, rocks, box of assorted rocks, 5.300]";
            stringArray3[8] = "+I[105, jacket, water resistent black wind breaker, 0.100]";
            stringArray3[9] = "+I[106, spare tire, 24 inch spare tire, 5.100]";
            stringArray = stringArray3;
            stringArray3[10] = "+I[110, jacket, new water resistent white wind breaker, 0.500]";
        }
        String[] expected = stringArray;
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.env.setParallelism(1);
        this.env.getCheckpointConfig().disableCheckpointing();
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( `id` INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM debezium_source");
        CloseableIterator iterator = result.collect();
        String[] expectedSnapshot = new String[]{"+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[106, hammer, 16oz carpenter's hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.300]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expectedSnapshot), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshot.length));
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
        }
        String[] expectedBinlog = new String[]{"+I[110, jacket, water resistent white wind breaker, 0.200]", "+I[111, scooter, Big 2-wheel scooter , 5.180]", "-U[110, jacket, water resistent white wind breaker, 0.200]", "+U[110, jacket, new water resistent white wind breaker, 0.500]", "-U[111, scooter, Big 2-wheel scooter , 5.180]", "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]"};
        MySqlConnectorITCase.assertEqualsInOrder(Arrays.asList(expectedBinlog), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expectedBinlog.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMysql57AllDataTypes() throws Throwable {
        this.testAllDataTypes(MYSQL_CONTAINER, this.fullTypesMySql57Database);
    }

    @Test
    public void testMySql8AllDataTypes() throws Throwable {
        this.testAllDataTypes(MYSQL8_CONTAINER, this.fullTypesMySql8Database);
    }

    public void testAllDataTypes(MySqlContainer mySqlContainer, UniqueDatabase database) throws Throwable {
        database.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE full_types (\n    `id` INT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    tiny_un_z_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    small_un_z_c INT,\n    medium_c INT,\n    medium_un_c INT,\n    medium_un_z_c BIGINT,\n    int_c INT ,\n    int_un_c BIGINT,\n    int_un_z_c BIGINT,\n    int11_c BIGINT,\n    big_c BIGINT,\n    big_un_c DECIMAL(20, 0),\n    big_un_z_c DECIMAL(20, 0),\n    varchar_c VARCHAR(255),\n    char_c CHAR(3),\n    real_c FLOAT,\n    float_c FLOAT,\n    float_un_c FLOAT,\n    float_un_z_c FLOAT,\n    double_c DOUBLE,\n    double_un_c DOUBLE,\n    double_un_z_c DOUBLE,\n    decimal_c DECIMAL(8, 4),\n    decimal_un_c DECIMAL(8, 4),\n    decimal_un_z_c DECIMAL(8, 4),\n    numeric_c DECIMAL(6, 0),\n    big_decimal_c STRING,\n    bit1_c BOOLEAN,\n    tiny1_c BOOLEAN,\n    boolean_c BOOLEAN,\n    date_c DATE,\n    time_c TIME(0),\n    datetime3_c TIMESTAMP(3),\n    datetime6_c TIMESTAMP(6),\n    timestamp_c TIMESTAMP(0),\n    file_uuid BYTES,\n    bit_c BINARY(8),\n    text_c STRING,\n    tiny_blob_c BYTES,\n    blob_c BYTES,\n    medium_blob_c BYTES,\n    long_blob_c BYTES,\n    year_c INT,\n    enum_c STRING,\n    set_c ARRAY<STRING>,\n    json_c STRING,\n    point_c STRING,\n    geometry_c STRING,\n    linestring_c STRING,\n    polygon_c STRING,\n    multipoint_c STRING,\n    multiline_c STRING,\n    multipolygon_c STRING,\n    geometrycollection_c STRING,\n    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", mySqlContainer.getHost(), mySqlContainer.getDatabasePort(), database.getUsername(), database.getPassword(), database.getDatabaseName(), "full_types", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT id,\ntiny_c,\ntiny_un_c,\ntiny_un_z_c,\nsmall_c,\nsmall_un_c,\nsmall_un_z_c,\nmedium_c, \nmedium_un_c, \nmedium_un_z_c, \nint_c,\nint_un_c,\nint_un_z_c,\nint11_c,\nbig_c,\nbig_un_c, \nbig_un_z_c, \nvarchar_c,\nchar_c,\nreal_c, \nfloat_c,\nfloat_un_c,\nfloat_un_z_c,\ndouble_c,\ndouble_un_c,\ndouble_un_z_c,\ndecimal_c,\ndecimal_un_c,\ndecimal_un_z_c,\nnumeric_c,\nbig_decimal_c,\nbit1_c,\ntiny1_c,\nboolean_c,\ndate_c,\ntime_c,\ndatetime3_c,\ndatetime6_c,\ntimestamp_c,\nTO_BASE64(DECODE(file_uuid, 'UTF-8')),\nbit_c,\ntext_c,\ntiny_blob_c,\nblob_c,\nmedium_blob_c,\nlong_blob_c,\nyear_c,\nenum_c,\nset_c,\njson_c, \npoint_c, \ngeometry_c, \nlinestring_c, \npolygon_c, \nmultipoint_c, \nmultiline_c, \nmultipolygon_c, \ngeometrycollection_c \n FROM full_types");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
        }
        String expectedPointJsonText = "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}";
        String expectedGeometryJsonText = "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}";
        String expectLinestringJsonText = "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}";
        String expectPolygonJsonText = "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}";
        String expectMultipointJsonText = "{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}";
        String expectMultilineJsonText = "{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}";
        String expectMultipolygonJsonText = "{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}";
        String expectGeometryCollectionJsonText = "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}";
        String[] expected = new String[]{"+I[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\": \"value1\"}, " + expectedPointJsonText + ", " + expectedGeometryJsonText + ", " + expectLinestringJsonText + ", " + expectPolygonJsonText + ", " + expectMultipointJsonText + ", " + expectMultilineJsonText + ", " + expectMultipolygonJsonText + ", " + expectGeometryCollectionJsonText + "]", "-U[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\":\"value1\"}, " + expectedPointJsonText + ", " + expectedGeometryJsonText + ", " + expectLinestringJsonText + ", " + expectPolygonJsonText + ", " + expectMultipointJsonText + ", " + expectMultilineJsonText + ", " + expectMultipolygonJsonText + ", " + expectGeometryCollectionJsonText + "]", "+U[1, 127, 255, 255, 32767, 65535, 65535, 8388607, 16777215, 16777215, 2147483647, 4294967295, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, 18446744073709551615, Hello World, abc, 123.102, 123.102, 123.103, 123.104, 404.4443, 404.4444, 404.4445, 123.4567, 123.4568, 123.4569, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=, [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\":\"value1\"}, " + expectedPointJsonText + ", " + expectedGeometryJsonText + ", " + expectLinestringJsonText + ", " + expectPolygonJsonText + ", " + expectMultipointJsonText + ", " + expectMultilineJsonText + ", " + expectMultipolygonJsonText + ", " + expectGeometryCollectionJsonText + "]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testWideTable() throws Exception {
        int tableColumnCount = 500;
        this.fullTypesMySql57Database.createAndInitialize();
        try (Connection connection = this.fullTypesMySql57Database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute(String.format("USE %s", this.fullTypesMySql57Database.getDatabaseName()));
            statement.execute("CREATE TABLE wide_table(" + MySqlConnectorITCase.buildColumnsDDL("col", 0, 500, "BIGINT") + " PRIMARY KEY (col0) )");
            statement.execute("INSERT INTO wide_table values(" + MySqlConnectorITCase.getIntegerSeqString(0, 500) + ")");
        }
        String sourceDDL = String.format("CREATE TABLE wide_table (\n" + MySqlConnectorITCase.buildColumnsDDL("col", 0, 500, "BIGINT") + "    primary key (`col0`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.fullTypesMySql57Database.getUsername(), this.fullTypesMySql57Database.getPassword(), this.fullTypesMySql57Database.getDatabaseName(), "wide_table", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM wide_table");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = this.fullTypesMySql57Database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE wide_table SET col1 = 1024 WHERE col0=0;");
        }
        String[] expected = new String[]{"+I[0, 1, " + MySqlConnectorITCase.getIntegerSeqString(2, 500) + "]", "-U[0, 1, " + MySqlConnectorITCase.getIntegerSeqString(2, 500) + "]", "+U[0, 1024, " + MySqlConnectorITCase.getIntegerSeqString(2, 500) + "]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testBigTableWithHugeSplits() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        int tableRowNumber = 10;
        this.fullTypesMySql57Database.createAndInitialize();
        try (Connection connection = this.fullTypesMySql57Database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute(String.format("USE %s", this.fullTypesMySql57Database.getDatabaseName()));
            statement.execute("CREATE TABLE big_table1(id BIGINT, str VARCHAR(100), PRIMARY KEY (id))");
            statement.execute("CREATE TABLE big_table2(id BIGINT, str VARCHAR(100), PRIMARY KEY (id))");
            for (int i = 0; i < 10; ++i) {
                statement.execute("INSERT INTO big_table1 values(" + i + "," + (i + 100000) + ")");
                statement.execute("INSERT INTO big_table2 values(" + i + "," + (i + 200000) + ")");
            }
        }
        String sourceDDL = String.format("CREATE TABLE big_table (\n    id BIGINT,    str STRING,    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = 'big_table.*', 'scan.incremental.snapshot.chunk.size' = '2', 'chunk-meta.group.size' = '3', 'server-time-zone' = 'UTC', 'server-id' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.fullTypesMySql57Database.getUsername(), this.fullTypesMySql57Database.getPassword(), this.fullTypesMySql57Database.getDatabaseName(), this.getServerId());
        String sinkDDL = "CREATE TABLE sink ( `id` BIGINT NOT NULL, str STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM big_table");
        MySqlConnectorITCase.waitForSinkSize("sink", 20);
        try (Connection connection = this.fullTypesMySql57Database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE big_table1 SET str = '1024' WHERE id=0;");
            statement.execute("UPDATE big_table1 SET str = '1025' WHERE id=1;");
            statement.execute("UPDATE big_table2 SET str = '2048' WHERE id=2;");
            statement.execute("UPDATE big_table2 SET str = '2049' WHERE id=3;");
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 24);
        ArrayList<String> expected = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            expected.add("+I[" + i + ", " + (i + 100000) + "]");
            expected.add("+I[" + i + ", " + (i + 200000) + "]");
        }
        expected.addAll(Lists.newArrayList((Object[])new String[]{"+U[0, 1024]", "+U[1, 1025]", "+U[2, 2048]", "+U[3, 2049]"}));
        List actual = TestValuesTableFactory.getRawResults((String)"sink");
        Collections.sort(actual);
        Collections.sort(expected);
        Assert.assertEquals(expected, (Object)actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMetadataColumns() throws Exception {
        this.userDatabase1.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE mysql_users ( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.userDatabase1.getUsername(), this.userDatabase1.getPassword(), this.userDatabase1.getDatabaseName(), "user_table_.*", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        String sinkDDL = "CREATE TABLE sink ( database_name STRING, table_name STRING, row_kind STRING, `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (database_name, table_name, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users");
        MySqlConnectorITCase.waitForSinkSize("sink", 2);
        try (Connection connection = this.userDatabase1.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);");
            statement.execute("INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, 'user_300@foo.com');");
            statement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;");
            statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;");
            statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 7);
        List expected = Stream.of("+I[%s, user_table_1_1, +I, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "+I[%s, user_table_1_2, +I, 121, user_121, Shanghai, 123567891234, null, null]", "+I[%s, user_table_1_2, +I, 200, user_200, Wuhan, 123567891234, null, null]", "+I[%s, user_table_1_1, +I, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]", "+U[%s, user_table_1_1, +U, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]", "+U[%s, user_table_1_2, +U, 121, user_121, Shanghai, 88888888, null, null]", "-D[%s, user_table_1_1, -D, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "-U[%s, user_table_1_1, -U, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]", "-U[%s, user_table_1_2, -U, 121, user_121, Shanghai, 123567891234, null, null]").map(s -> String.format(s, this.userDatabase1.getDatabaseName())).sorted().collect(Collectors.toList());
        List actual = TestValuesTableFactory.getRawResults((String)"sink");
        Collections.sort(actual);
        Assert.assertEquals(expected, (Object)actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key(id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = 'latest-offset', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", this.incrementalSnapshot, this.getServerId());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM debezium_source");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator iterator = result.collect();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
        }
        String[] expected = new String[]{"+I[110, jacket, water resistent white wind breaker, 0.200]", "+I[111, scooter, Big 2-wheel scooter , 5.180]", "-U[110, jacket, water resistent white wind breaker, 0.200]", "+U[110, jacket, new water resistent white wind breaker, 0.500]", "-U[111, scooter, Big 2-wheel scooter , 5.180]", "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testPrimaryKeyWithVarbinaryType() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE varbinary_pk_table ( order_id VARBINARY(11), order_date DATE, quantity INT, product_id INT, purchaser STRING, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "varbinary_pk_table", this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM varbinary_pk_table");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator iterator = result.collect();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');");
            statement.execute("INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
            statement.execute("UPDATE varbinary_pk_table SET quantity=50 WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000101';");
            statement.execute("DELETE FROM varbinary_pk_table WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000110';");
        }
        String[] expected = new String[]{"+I[[4, 4, 4, 4, 4, 4, 4, 0], 2021-03-08, 0, 0, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 1], 2021-03-08, 10, 100, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 2], 2021-03-08, 20, 200, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 3], 2021-03-08, 30, 300, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 4], 2021-03-08, 40, 400, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]", "+I[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]", "-U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]", "+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]", "-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
        this.customerDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE address ( `id` DECIMAL(20, 0) NOT NULL, country STRING, city STRING, detail_address STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "address", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT id,\ncountry,\ncity,\ndetail_address FROM address");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = this.customerDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE address SET city = 'Hangzhou' WHERE id=416927583791428523;");
            statement.execute("INSERT INTO address VALUES(418257940021724075, 'Germany', 'Berlin', 'West Town address 3')");
        }
        String[] expected = new String[]{"+I[417271541558096811, America, New York, East Town address 2]", "+I[417272886855938987, America, New York, East Town address 3]", "+I[417111867899200427, America, New York, East Town address 1]", "+I[417420106184475563, Germany, Berlin, West Town address 1]", "+I[418161258277847979, Germany, Berlin, West Town address 2]", "+I[416874195632735147, China, Beijing, West Town address 1]", "+I[416927583791428523, China, Beijing, West Town address 2]", "+I[417022095255614379, China, Beijing, West Town address 3]", "-U[416927583791428523, China, Beijing, West Town address 2]", "+U[416927583791428523, China, Hangzhou, West Town address 2]", "+I[418257940021724075, Germany, Berlin, West Town address 3]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testReadingWithDotTableName() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        customer3_0Database.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE customers ( `id` INTEGER NOT NULL, name STRING, address STRING, phone_number STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), customer3_0Database.getUsername(), customer3_0Database.getPassword(), customer3_0Database.getDatabaseName(), "customers3.0", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT id,\nname,\naddress,\nphone_number FROM customers");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = customer3_0Database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE `customers3.0` SET address = 'Hangzhou' WHERE id=103;");
            statement.execute("INSERT INTO `customers3.0` VALUES(110, 'newCustomer', 'Berlin', '12345678')");
        }
        String[] expected = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[104, user_4, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "+I[110, newCustomer, Berlin, 12345678]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
        customer3_0Database.dropDatabase();
    }

    @Test
    public void testReadingWithRegexPattern() throws Exception {
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.customerDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE customers ( `id` INTEGER NOT NULL, name STRING, address STRING, phone_number STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), String.format("%s.*", this.customerDatabase.getDatabaseName()), "customers", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM customers");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        String[] expected = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testDdlWithDefaultStringValue() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.customerDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE default_value_test ( id BIGINT NOT NULL, name STRING, address STRING, phone_number BIGINT, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "default_value_test.*", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM default_value_test");
        JobClient jobClient = (JobClient)result.getJobClient().get();
        MySqlTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow((Duration)Duration.ofSeconds(10L)));
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = this.customerDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("DELETE FROM default_value_test WHERE id=1;");
        }
        String[] expected = new String[]{"+I[1, user1, Shanghai, 123567]", "+I[2, user2, Shanghai, 123567]", "-D[1, user1, Shanghai, 123567]"};
        try (Connection connection = this.customerDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute(" CREATE TABLE temp_default_value_test (\n     id INTEGER NOT NULL PRIMARY KEY, \n     tiny_c TINYINT DEFAULT ' 0 ', \n     boolean_c BOOLEAN DEFAULT ' 1 ', \n     tiny_un_z_c TINYINT UNSIGNED ZEROFILL DEFAULT ' 2 ', \n     small_c SMALLINT DEFAULT ' 3 ', \n     small_un_c SMALLINT UNSIGNED DEFAULT ' 4 ',\n     small_un_z_c SMALLINT UNSIGNED ZEROFILL DEFAULT ' 5 ', \n     medium_c MEDIUMINT DEFAULT ' 6 ', \n     medium_un_c MEDIUMINT UNSIGNED DEFAULT ' 7 ', \n     medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL DEFAULT ' 8 ', \n     int_c INTEGER DEFAULT ' 9 ', \n     int_un_c INTEGER UNSIGNED DEFAULT ' 10 ', \n     int_un_z_c INTEGER UNSIGNED ZEROFILL DEFAULT ' 11 ',\n     int11_c INT(11) DEFAULT ' 12 ', \n     big_c BIGINT DEFAULT ' 13 ', \n     big_un_c BIGINT UNSIGNED DEFAULT ' 14 ', \n     big_un_z_c BIGINT UNSIGNED ZEROFILL DEFAULT ' 15 ', \n     decimal_c DECIMAL(8, 4) DEFAULT ' 16  ', \n     decimal_un_c DECIMAL(8, 4) UNSIGNED DEFAULT ' 17 ', \n     decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL DEFAULT ' 18 ', \n     numeric_c NUMERIC(6, 0) DEFAULT ' 19 ', \n     big_decimal_c DECIMAL(65, 1) DEFAULT ' 20 ',\n     real_c REAL DEFAULT ' 21.0',\n     float_c FLOAT DEFAULT ' 22.0',\n     float_un_c FLOAT UNSIGNED DEFAULT ' 23',\n     float_un_z_c FLOAT UNSIGNED ZEROFILL DEFAULT ' 24',\n     double_c DOUBLE DEFAULT ' 25',\n     double_un_c DOUBLE UNSIGNED DEFAULT ' 26',\n     double_un_z_c DOUBLE UNSIGNED ZEROFILL DEFAULT ' 27',\n     tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 ' );");
            statement.execute("alter table temp_default_value_test alter column `small_c` SET DEFAULT ' 29 ';");
            statement.execute("alter table temp_default_value_test add column\n    `new_col` smallint(1) unsigned DEFAULT ' 30 ';");
            statement.execute("alter table default_value_test add column\n    `new_col` smallint(1) unsigned DEFAULT ' 31 ';");
            statement.execute(" CREATE TABLE default_value_test_ignore (\n     id INTEGER NOT NULL PRIMARY KEY, \n     tiny_c TINYINT DEFAULT ' 0 ', \n     boolean_c BOOLEAN DEFAULT ' 1 ', \n     tiny_un_z_c TINYINT UNSIGNED ZEROFILL DEFAULT ' 2 ', \n     small_c SMALLINT DEFAULT ' 3 ', \n     small_un_c SMALLINT UNSIGNED DEFAULT ' 4 ',\n     small_un_z_c SMALLINT UNSIGNED ZEROFILL DEFAULT ' 5 ', \n     medium_c MEDIUMINT DEFAULT ' 6 ', \n     medium_un_c MEDIUMINT UNSIGNED DEFAULT ' 7 ', \n     medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL DEFAULT ' 8 ', \n     int_c INTEGER DEFAULT ' 9 ', \n     int_un_c INTEGER UNSIGNED DEFAULT ' 10 ', \n     int_un_z_c INTEGER UNSIGNED ZEROFILL DEFAULT ' 11 ',\n     int11_c INT(11) DEFAULT ' 12 ', \n     big_c BIGINT DEFAULT ' 13 ', \n     big_un_c BIGINT UNSIGNED DEFAULT ' 14 ', \n     big_un_z_c BIGINT UNSIGNED ZEROFILL DEFAULT ' 15 ', \n     decimal_c DECIMAL(8, 4) DEFAULT ' 16  ', \n     decimal_un_c DECIMAL(8, 4) UNSIGNED DEFAULT ' 17 ', \n     decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL DEFAULT ' 18 ', \n     numeric_c NUMERIC(6, 0) DEFAULT ' 19 ', \n     big_decimal_c DECIMAL(65, 1) DEFAULT ' 20 ',\n     real_c REAL DEFAULT ' 21.0',\n     float_c FLOAT DEFAULT ' 22.0',\n     float_un_c FLOAT UNSIGNED DEFAULT ' 23',\n     float_un_z_c FLOAT UNSIGNED ZEROFILL DEFAULT ' 24',\n     double_c DOUBLE DEFAULT ' 25',\n     double_un_c DOUBLE UNSIGNED DEFAULT ' 26',\n     double_un_z_c DOUBLE UNSIGNED ZEROFILL DEFAULT ' 27',\n     tiny_un_c TINYINT UNSIGNED DEFAULT ' 28 ' );");
        }
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        jobClient.cancel().get();
    }

    @Test
    public void testAlterWithDefaultStringValue() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.customerDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE default_value_test ( id BIGINT NOT NULL, name STRING, address STRING, phone_number BIGINT, primary key (id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "default_value_test", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM default_value_test");
        JobClient jobClient = (JobClient)result.getJobClient().get();
        MySqlTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow((Duration)Duration.ofSeconds(10L)));
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = this.customerDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("DELETE FROM default_value_test WHERE id=1;");
        }
        String[] expected = new String[]{"+I[1, user1, Shanghai, 123567]", "+I[2, user2, Shanghai, 123567]", "-D[1, user1, Shanghai, 123567]"};
        try (Connection connection = this.customerDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("alter table default_value_test add column `collate_test` INT DEFAULT ' 29 ' COLLATE 'utf8_general_ci';");
            statement.execute("alter table default_value_test add column `int_test` INT DEFAULT ' 30 ';");
        }
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        jobClient.cancel().get();
    }

    @Test
    public void testShardingTablesWithInconsistentSchema() throws Exception {
        Throwable throwable;
        Statement statement;
        this.userDatabase1.createAndInitialize();
        this.userDatabase2.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE `user` ( `id` DECIMAL(20, 0) NOT NULL, name STRING, address STRING, phone_number STRING, email STRING, age INT, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.userDatabase1.getUsername(), this.userDatabase1.getPassword(), String.format("(%s|%s)", this.userDatabase1.getDatabaseName(), this.userDatabase2.getDatabaseName()), "user_table_.*", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM `user`");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        try (Connection connection = this.userDatabase1.getJdbcConnection();){
            statement = connection.createStatement();
            throwable = null;
            try {
                statement.execute("UPDATE user_table_1_1 SET email = 'user_111@bar.org' WHERE id=111;");
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (statement != null) {
                    if (throwable != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        statement.close();
                    }
                }
            }
        }
        connection = this.userDatabase2.getJdbcConnection();
        var5_5 = null;
        try {
            statement = connection.createStatement();
            throwable = null;
            try {
                statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE id=221;");
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                if (statement != null) {
                    if (throwable != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable throwable5) {
                            throwable.addSuppressed(throwable5);
                        }
                    } else {
                        statement.close();
                    }
                }
            }
        }
        catch (Throwable throwable6) {
            var5_5 = throwable6;
            throw throwable6;
        }
        finally {
            if (connection != null) {
                if (var5_5 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable7) {
                        var5_5.addSuppressed(throwable7);
                    }
                } else {
                    connection.close();
                }
            }
        }
        String[] expected = new String[]{"+I[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "-U[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]", "+U[111, user_111, Shanghai, 123567891234, user_111@bar.org, null]", "+I[121, user_121, Shanghai, 123567891234, null, null]", "+I[211, user_211, Shanghai, 123567891234, null, null]", "+I[221, user_221, Shanghai, 123567891234, null, 18]", "-U[221, user_221, Shanghai, 123567891234, null, 18]", "+U[221, user_221, Shanghai, 123567891234, null, 20]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromSpecificBinlogFilePos() throws Exception {
        Object statement;
        this.inventoryDatabase.createAndInitialize();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement2 = connection.createStatement();){
            statement2.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
            statement2.execute("UPDATE products SET weight='5.1' WHERE id=107;");
        }
        Tuple2<String, Integer> offset = LegacyMySqlSourceTest.currentMySqlLatestOffset(MYSQL_CONTAINER, this.inventoryDatabase, "products", 9, false);
        String sourceDDL = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'specific-offset', 'scan.startup.specific-offset.file' = '%s', 'scan.startup.specific-offset.pos' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", offset.f0, offset.f1, this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();){
            statement = connection.createStatement();
            Throwable throwable = null;
            try {
                statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (statement != null) {
                    if (throwable != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        statement.close();
                    }
                }
            }
        }
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Connection connection = this.inventoryDatabase.getJdbcConnection();
        statement = null;
        try (Statement statement3 = connection.createStatement();){
            statement3.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement3.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement3.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement3.execute("DELETE FROM products WHERE id=111;");
        }
        catch (Throwable throwable) {
            statement = throwable;
            throw throwable;
        }
        finally {
            if (connection != null) {
                if (statement != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)statement).addSuppressed(throwable);
                    }
                } else {
                    connection.close();
                }
            }
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 5);
        String[] expected = new String[]{"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromSpecificGtidSet() throws Exception {
        Object statement3;
        BinlogOffset offset;
        Throwable throwable;
        if (!this.incrementalSnapshot) {
            return;
        }
        this.inventoryDatabase.createAndInitialize();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();){
            throwable = null;
            try (Statement statement2 = connection.createStatement();){
                statement2.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
                statement2.execute("UPDATE products SET weight='5.1' WHERE id=107;");
                offset = DebeziumUtils.currentBinlogOffset((JdbcConnection)DebeziumUtils.createMySqlConnection((MySqlSourceConfig)new MySqlSourceConfigFactory().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).username(TEST_USER).password(TEST_PASSWORD).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{"products"}).createConfig(0)));
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        String sourceDDL = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'specific-offset', 'scan.startup.specific-offset.gtid-set' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", offset.getGtidSet(), this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        throwable = null;
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();){
            statement3 = connection.createStatement();
            Throwable throwable3 = null;
            try {
                statement3.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            }
            catch (Throwable throwable4) {
                throwable3 = throwable4;
                throw throwable4;
            }
            finally {
                if (statement3 != null) {
                    if (throwable3 != null) {
                        try {
                            statement3.close();
                        }
                        catch (Throwable throwable5) {
                            throwable3.addSuppressed(throwable5);
                        }
                    } else {
                        statement3.close();
                    }
                }
            }
        }
        catch (Throwable statement3) {
            throwable = statement3;
            throw statement3;
        }
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Connection connection = this.inventoryDatabase.getJdbcConnection();
        statement3 = null;
        try (Statement statement4 = connection.createStatement();){
            statement4.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement4.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement4.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement4.execute("DELETE FROM products WHERE id=111;");
        }
        catch (Throwable throwable6) {
            statement3 = throwable6;
            throw throwable6;
        }
        finally {
            if (connection != null) {
                if (statement3 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable7) {
                        ((Throwable)statement3).addSuppressed(throwable7);
                    }
                } else {
                    connection.close();
                }
            }
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 5);
        String[] expected = new String[]{"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromEarliestOffset() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'earliest-offset', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
            statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
        }
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        MySqlConnectorITCase.waitForSinkSize("sink", 16);
        String[] expected = new String[]{"+I[101, scooter, Small 2-wheel scooter, 3.140]", "+I[102, car battery, 12V car battery, 8.100]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", "+I[104, hammer, 12oz carpenter's hammer, 0.750]", "+I[105, hammer, 14oz carpenter's hammer, 0.875]", "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]", "+I[106, hammer, 18oz carpenter hammer, 1.000]", "+I[107, rocks, box of assorted rocks, 5.100]", "+I[110, jacket, new water resistent white wind breaker, 0.500]"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromTimestamp() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        Thread.sleep(5000L);
        String sourceDDL = String.format("CREATE TABLE debezium_source ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "products", System.currentTimeMillis(), this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Thread.sleep(5000L);
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
            statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
            statement.execute("DELETE FROM products WHERE id=111;");
        }
        MySqlConnectorITCase.waitForSinkSize("sink", 5);
        String[] expected = new String[]{"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testColumnOptionalWithDefaultValue() throws Exception {
        this.customerDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE debezium_source ( `product_no` DECIMAL(20, 4) NOT NULL, product_kind STRING, user_id STRING, description STRING, primary key (`product_no`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "shopping_cart_dec", this.incrementalSnapshot, this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT product_no,\nproduct_kind,\nuser_id,\ndescription FROM debezium_source");
        CloseableIterator iterator = result.collect();
        MySqlConnectorITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        String[] expected = new String[]{"+I[123456.1230, KIND_001, user_1, my shopping cart]", "+I[123457.4560, KIND_002, user_2, my shopping cart]", "+I[123458.6789, KIND_003, user_3, my shopping cart]", "+I[123459.1234, KIND_004, user_4, null]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testReadingWithMultiMaxValue() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE multi_max_table ( order_id STRING, index INTEGER, desc STRING, PRIMARY KEY(order_id, index) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "multi_max_table", this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM multi_max_table");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator iterator = result.collect();
        String[] expected = new String[]{"+I[, 0, flink]", "+I[, 1, flink]", "+I[, 2, flink]", "+I[a, 0, flink]", "+I[b, 0, flink]", "+I[c, 0, flink]", "+I[d, 0, flink]", "+I[E, 0, flink]", "+I[E, 1, flink]", "+I[E, 2, flink]", "+I[E, 3, flink]", "+I[e, 4, flink]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testServerIdConflict() {
        try {
            this.env.setRestartStrategy(RestartStrategies.noRestart());
            this.customerDatabase.createAndInitialize();
            int base = 5400;
            for (int i = 0; i < 2; ++i) {
                String sourceDDL = String.format("CREATE TABLE debezium_source%d ( `id` INTEGER NOT NULL, `name` STRING, `address` STRING, `phone_name` STRING, primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", i, MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customerDatabase.getUsername(), this.customerDatabase.getPassword(), this.customerDatabase.getDatabaseName(), "customers", this.incrementalSnapshot, this.getServerId(base), this.getSplitSize());
                String sinkDDL = String.format("CREATE TABLE blackhole_table%d WITH ('connector' = 'blackhole')\n LIKE debezium_source%d (EXCLUDING ALL)", i, i);
                this.tEnv.executeSql(sourceDDL);
                this.tEnv.executeSql(sinkDDL);
            }
            StreamStatementSet statementSet = this.tEnv.createStatementSet();
            statementSet.addInsertSql("Insert into blackhole_table0 select * from debezium_source0");
            statementSet.addInsertSql("Insert into blackhole_table1 select * from debezium_source1");
            statementSet.execute().await();
            Assert.fail();
        }
        catch (Throwable t) {
            MySqlTestUtils.assertContainsErrorMsg(t, "The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\nThe server id conflict may happen in the following situations: \n1. The server id has been used by other mysql cdc table in the current job.\n2. The server id has been used by the mysql cdc table in other jobs.\n3. The server id has been used by other sync tools like canal, debezium and so on.\n");
        }
    }

    @Test
    public void testBinlogTableMetadataDeserialization() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.binlogDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE binlog_metadata (\n    id BIGINT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    tiny_un_z_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    small_un_z_c INT,\n    year_c INT,\n PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s')", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.binlogDatabase.getDatabaseName(), "binlog_metadata", this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM binlog_metadata");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator iterator = result.collect();
        try (Connection connection = this.binlogDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO binlog_metadata VALUES (2, 127, 255, 255, 32767, 65535, 65535, 2024),(3, 127, 255, 255, 32767, 65535, 65535, 2024);");
            statement.execute("DELETE FROM binlog_metadata WHERE id=3;");
        }
        String[] expected = new String[]{"+I[1, 127, 255, 255, 32767, 65535, 65535, 2023]", "+I[2, 127, 255, 255, 32767, 65535, 65535, 2024]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        if (this.incrementalSnapshot) {
            return serverId + "-" + (serverId + this.env.getParallelism());
        }
        return String.valueOf(serverId);
    }

    protected String getServerId(int base) {
        if (this.incrementalSnapshot) {
            return base + "-" + (base + 4);
        }
        return String.valueOf(base);
    }

    private int getSplitSize() {
        if (this.incrementalSnapshot) {
            return 4;
        }
        return 0;
    }

    private static String buildColumnsDDL(String columnPrefix, int start, int end, String dataType) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = start; i < end; ++i) {
            stringBuilder.append(columnPrefix).append(i).append(" ").append(dataType).append(",");
        }
        return stringBuilder.toString();
    }

    private static String getIntegerSeqString(int start, int end) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = start; i < end - 1; ++i) {
            stringBuilder.append(i).append(", ");
        }
        stringBuilder.append(end - 1);
        return stringBuilder.toString();
    }

    private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
        while (MySqlConnectorITCase.sinkSize(sinkName) == 0) {
            Thread.sleep(100L);
        }
    }

    private static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (MySqlConnectorITCase.sinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int sinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getRawResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
        while (!iterator.hasNext()) {
            Thread.sleep(100L);
        }
    }

    @Test
    public void testBinaryHandlingModeWithBase64() throws Exception {
        if (!this.incrementalSnapshot) {
            return;
        }
        this.inventoryDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE varbinary_base64_table ( id INT, order_id STRING, order_date DATE, quantity INT, product_id INT, purchaser STRING, PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s', 'debezium.binary.handling.mode' = 'base64')", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), TEST_USER, TEST_PASSWORD, this.inventoryDatabase.getDatabaseName(), "varbinary_base64_table", this.getServerId(), this.getSplitSize());
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT * FROM varbinary_base64_table");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        CloseableIterator iterator = result.collect();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO varbinary_base64_table VALUES (6, b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');");
            statement.execute("INSERT INTO varbinary_base64_table VALUES (7, b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
            statement.execute("UPDATE varbinary_base64_table SET quantity=50 WHERE id=6;");
            statement.execute("DELETE FROM varbinary_base64_table WHERE id= 7;");
        }
        String[] expected = new String[]{"+I[1, BAQEBAQEBAA=, 2021-03-08, 0, 0, flink]", "+I[2, BAQEBAQEBAE=, 2021-03-08, 10, 100, flink]", "+I[3, BAQEBAQEBAI=, 2021-03-08, 20, 200, flink]", "+I[4, BAQEBAQEBAM=, 2021-03-08, 30, 300, flink]", "+I[5, BAQEBAQEBAQ=, 2021-03-08, 40, 400, flink]", "+I[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", "+I[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]", "-U[6, BAQEBAQEBAU=, 2021-03-08, 30, 500, flink]", "+U[6, BAQEBAQEBAU=, 2021-03-08, 50, 500, flink]", "-D[7, BAQEBAQEBAY=, 2021-03-08, 30, 500, flink-sql]"};
        MySqlConnectorITCase.assertEqualsInAnyOrder(Arrays.asList(expected), MySqlConnectorITCase.fetchRows((Iterator<Row>)iterator, expected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }
}

