/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.io.Serializable;
import java.sql.SQLException;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MySqlSourceITCase
extends MySqlSourceTestBase {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)300L);
    private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final List<String> initialChanges = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]");
    private final List<String> firstPartBinlogEvents = Arrays.asList("-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]");
    private final List<String> secondPartBinlogEvents = Arrays.asList("-U[1010, user_11, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]");
    @Parameterized.Parameter
    public String tableName;
    @Parameterized.Parameter(value=1)
    public String chunkColumnName;
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

    @Parameterized.Parameters(name="table: {0}, chunkColumn: {1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList({"customers", null}, {"customers", "id"}, {"customers_no_pk", "id"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        this.testMySqlParallelSource(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackFill() throws Exception {
        this.testMySqlParallelSource(1, DEFAULT_SCAN_STARTUP_MODE, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName}, RestartStrategies.fixedDelayRestart((int)1, (long)0L), true);
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        this.testMySqlParallelSource(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName});
    }

    @Test
    public void testReadMultipleTableWithSingleParallelism() throws Exception {
        this.testMySqlParallelSource(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testReadMultipleTableWithMultipleParallelism() throws Exception {
        this.testMySqlParallelSource(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        this.testMySqlParallelSource(MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInBinlogPhase() throws Exception {
        this.testMySqlParallelSource(MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverFromLatestOffset() throws Exception {
        this.testMySqlParallelSource(4, "latest-offset", MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"}, RestartStrategies.fixedDelayRestart((int)1, (long)0L));
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        this.testMySqlParallelSource(MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInBinlogPhase() throws Exception {
        this.testMySqlParallelSource(MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testJobManagerFailoverFromLatestOffset() throws Exception {
        this.testMySqlParallelSource(4, "latest-offset", MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"}, RestartStrategies.fixedDelayRestart((int)1, (long)0L));
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        this.testMySqlParallelSource(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        this.testMySqlParallelSource(1, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName});
    }

    @Test
    public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(5000L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        MySqlSource<RowData> sleepingSource = this.buildSleepingSource();
        DataStreamSource source = env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource");
        String[] expectedSnapshotData = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        TypeSerializer serializer = source.getTransformation().getOutputType().createSerializer(env.getConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator();
        CollectResultIterator iterator = new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
        CollectStreamSink sink = new CollectStreamSink((DataStream)source, factory);
        sink.name("Data stream collect sink");
        env.addOperator(sink.getTransformation());
        JobClient jobClient = env.executeAsync("snapshotSplitTest");
        iterator.setJobClient(jobClient);
        JobID jobId = jobClient.getJobID();
        if (iterator.hasNext()) {
            MySqlSourceITCase.triggerFailover(MySqlSourceTestBase.FailoverType.JM, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        MySqlSourceITCase.assertEqualsInAnyOrder(Arrays.asList(expectedSnapshotData), MySqlSourceITCase.fetchRowData((Iterator<RowData>)iterator, expectedSnapshotData.length));
        Assert.assertTrue((!this.hasNextData((CloseableIterator<?>)iterator) ? 1 : 0) != 0);
        jobClient.cancel().get();
    }

    @Test
    public void testStartFromEarliestOffset() throws Exception {
        ArrayList<String> expected = new ArrayList<String>();
        expected.addAll(this.initialChanges);
        expected.addAll(this.firstPartBinlogEvents);
        this.testStartingOffset(StartupOptions.earliest(), expected);
    }

    @Test
    public void testStartFromLatestOffset() throws Exception {
        this.testStartingOffset(StartupOptions.latest(), Collections.emptyList());
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 3, StartupOptions.snapshot());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 2, StartupOptions.snapshot());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 2, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 1, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(true, 25, 2, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(true, 25, 1, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]");
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedRecords, records);
    }

    private List<String> testBackfillWhenWritingEvents(boolean skipSnapshotBackfill, int fetchSize, int hookType, StartupOptions startupOptions) throws Exception {
        this.customDatabase.createAndInitialize();
        TestTable customerTable = new TestTable(this.customDatabase, "customers", TestTableSchemas.CUSTOMERS);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        MySqlSource source = MySqlSource.builder().hostname(this.customDatabase.getHost()).port(this.customDatabase.getDatabasePort()).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverTimeZone("UTC").databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{customerTable.getTableId()}).deserializer((DebeziumDeserializationSchema)customerTable.getDeserializer()).skipSnapshotBackfill(skipSnapshotBackfill).startupOptions(startupOptions).build();
        String[] statements = new String[]{String.format("INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')", customerTable.getTableId()), String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", customerTable.getTableId()), String.format("DELETE FROM %s WHERE id=1019", customerTable.getTableId())};
        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
        SnapshotPhaseHook & Serializable snapshotPhaseHook = (SnapshotPhaseHook & Serializable)(connection, split) -> {
            connection.setAutoCommit(false);
            connection.execute(statements);
            connection.commit();
        };
        switch (hookType) {
            case 1: {
                hooks.setPostLowWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
                break;
            }
            case 2: {
                hooks.setPreHighWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
                break;
            }
            case 3: {
                hooks.setPostHighWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
            }
        }
        source.setSnapshotHooks(hooks);
        try (CloseableIterator iterator = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source").executeAndCollect();){
            List<String> records;
            List<String> list = records = this.fetchRowData((Iterator<RowData>)iterator, fetchSize, customerTable::stringify);
            return list;
        }
    }

    private void testStartingOffset(StartupOptions startupOptions, List<String> expectedChangelogAfterStart) throws Exception {
        this.customDatabase.createAndInitialize();
        String tableId = this.getTableId();
        this.makeFirstPartBinlogEvents((JdbcConnection)this.getConnection(), tableId);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataType dataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())});
        LogicalType logicalType = TypeConversions.fromDataToLogicalType((DataType)dataType);
        InternalTypeInfo typeInfo = InternalTypeInfo.of((LogicalType)logicalType);
        RowDataDebeziumDeserializeSchema deserializer = RowDataDebeziumDeserializeSchema.newBuilder().setPhysicalRowType((RowType)dataType.getLogicalType()).setResultTypeInfo((TypeInformation)typeInfo).build();
        MySqlSource mySqlSource = MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).serverTimeZone("UTC").tableList(new String[]{tableId}).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverId("5401-5404").deserializer((DebeziumDeserializationSchema)deserializer).startupOptions(startupOptions).chunkKeyColumn(new ObjectPath(this.customDatabase.getDatabaseName(), this.tableName), this.chunkColumnName).build();
        DataStreamSource source = env.fromSource((Source)mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
        try (CloseableIterator iterator = source.executeAndCollect();){
            List<String> rows = MySqlSourceITCase.fetchRowData((Iterator<RowData>)iterator, expectedChangelogAfterStart.size());
            MySqlSourceITCase.assertEqualsInAnyOrder(expectedChangelogAfterStart, rows);
        }
    }

    private MySqlSource<RowData> buildSleepingSource() {
        ResolvedSchema physicalSchema = new ResolvedSchema(Arrays.asList(Column.physical((String)"id", (DataType)((DataType)DataTypes.BIGINT().notNull())), Column.physical((String)"name", (DataType)DataTypes.STRING()), Column.physical((String)"address", (DataType)DataTypes.STRING()), Column.physical((String)"phone_number", (DataType)DataTypes.STRING())), new ArrayList(), UniqueConstraint.primaryKey((String)"pk", Collections.singletonList("id")));
        RowType physicalDataType = (RowType)physicalSchema.toPhysicalRowDataType().getLogicalType();
        MetadataConverter[] metadataConverters = new MetadataConverter[]{};
        InternalTypeInfo typeInfo = InternalTypeInfo.of((RowType)physicalDataType);
        SleepingRowDataDebeziumDeserializeSchema deserializer = new SleepingRowDataDebeziumDeserializeSchema(RowDataDebeziumDeserializeSchema.newBuilder().setPhysicalRowType(physicalDataType).setMetadataConverters(metadataConverters).setResultTypeInfo((TypeInformation)typeInfo).setServerTimeZone(ZoneId.of("UTC")).setUserDefinedConverterFactory(MySqlDeserializationConverterFactory.instance()).build(), 1000L);
        return MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{this.getTableId()}).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverTimeZone("UTC").serverId(this.getServerId()).splitSize(8096).splitMetaGroupSize(1000).distributionFactorUpper(1000.0).distributionFactorLower(0.05).fetchSize(1024).connectTimeout(Duration.ofSeconds(30L)).connectMaxRetries(3).connectionPoolSize(20).debeziumProperties(new Properties()).startupOptions(StartupOptions.initial()).deserializer((DebeziumDeserializationSchema)deserializer).scanNewlyAddedTableEnabled(false).jdbcProperties(new Properties()).heartbeatInterval(Duration.ofSeconds(30L)).chunkKeyColumn(new ObjectPath(this.customDatabase.getDatabaseName(), this.tableName), this.chunkColumnName).build();
    }

    private void testMySqlParallelSource(MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        this.testMySqlParallelSource(4, failoverType, failoverPhase, captureCustomerTables);
    }

    private void testMySqlParallelSource(int parallelism, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        this.testMySqlParallelSource(parallelism, DEFAULT_SCAN_STARTUP_MODE, failoverType, failoverPhase, captureCustomerTables, RestartStrategies.fixedDelayRestart((int)1, (long)0L));
    }

    private void testMySqlParallelSource(int parallelism, String scanStartupMode, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) throws Exception {
        this.testMySqlParallelSource(parallelism, scanStartupMode, failoverType, failoverPhase, captureCustomerTables, restartStrategyConfiguration, false);
    }

    private void testMySqlParallelSource(int parallelism, String scanStartupMode, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, boolean skipSnapshotBackfill) throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        env.setParallelism(parallelism);
        env.enableCheckpointing(200L);
        env.setRestartStrategy(restartStrategyConfiguration);
        String sourceDDL = String.format("CREATE TABLE customers ( id BIGINT NOT NULL, name STRING, address STRING, phone_number STRING" + ("customers_no_pk".equals(this.tableName) ? "" : ", primary key (id) not enforced") + ") WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = '%s', 'scan.incremental.snapshot.chunk.size' = '100', 'scan.incremental.snapshot.backfill.skip' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s' %s)", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), this.getTableNameRegex(captureCustomerTables), scanStartupMode, skipSnapshotBackfill, this.getServerId(), this.chunkColumnName == null ? "" : String.format(", 'scan.incremental.snapshot.chunk.key-column' = '%s'", this.chunkColumnName));
        tEnv.executeSql(sourceDDL);
        TableResult tableResult = tEnv.executeSql("select * from customers");
        if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
            this.checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
        }
        this.checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);
        ((JobClient)tableResult.getJobClient().get()).cancel().get();
    }

    private void checkSnapshotData(TableResult tableResult, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        String[] snapshotForSingleTable = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        ArrayList<String> expectedSnapshotData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerTables.length; ++i) {
            expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
        }
        CloseableIterator iterator = tableResult.collect();
        JobID jobId = ((JobClient)tableResult.getJobClient().get()).getJobID();
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT && iterator.hasNext()) {
            MySqlSourceITCase.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedSnapshotData, MySqlSourceITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshotData.size()));
    }

    private void checkBinlogData(TableResult tableResult, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] captureCustomerTables) throws Exception {
        this.waitUntilJobRunning(tableResult);
        CloseableIterator iterator = tableResult.collect();
        JobID jobId = ((JobClient)tableResult.getJobClient().get()).getJobID();
        for (String tableId : captureCustomerTables) {
            this.makeFirstPartBinlogEvents((JdbcConnection)this.getConnection(), this.customDatabase.getDatabaseName() + '.' + tableId);
        }
        Thread.sleep(2000L);
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG) {
            MySqlSourceITCase.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(200L));
            this.waitUntilJobRunning(tableResult);
        }
        for (String tableId : captureCustomerTables) {
            this.makeSecondPartBinlogEvents((JdbcConnection)this.getConnection(), this.customDatabase.getDatabaseName() + '.' + tableId);
        }
        ArrayList<String> expectedBinlogData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerTables.length; ++i) {
            expectedBinlogData.addAll(this.firstPartBinlogEvents);
            expectedBinlogData.addAll(this.secondPartBinlogEvents);
        }
        MySqlSourceITCase.assertEqualsInAnyOrder(expectedBinlogData, MySqlSourceITCase.fetchRows((Iterator<Row>)iterator, expectedBinlogData.size()));
        Assert.assertTrue((!this.hasNextData(iterator) ? 1 : 0) != 0);
    }

    private static List<String> convertRowDataToRowString(List<RowData> rows) {
        LinkedHashMap<String, Integer> map = new LinkedHashMap<String, Integer>();
        map.put("id", 0);
        map.put("name", 1);
        map.put("address", 2);
        map.put("phone_number", 3);
        return rows.stream().map(row -> RowUtils.createRowWithNamedPositions((RowKind)row.getRowKind(), (Object[])new Object[]{row.getLong(0), row.getString(1), row.getString(2), row.getString(3)}, (LinkedHashMap)map).toString()).collect(Collectors.toList());
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    private List<String> fetchRowData(Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
        ArrayList<RowData> rows = new ArrayList<RowData>(size);
        while (size > 0 && iter.hasNext()) {
            RowData row = iter.next();
            rows.add(row);
            --size;
        }
        return rows.stream().map(stringifier).collect(Collectors.toList());
    }

    private static List<String> fetchRowData(Iterator<RowData> iter, int size) {
        ArrayList<RowData> rows = new ArrayList<RowData>(size);
        while (size > 0 && iter.hasNext()) {
            RowData row = iter.next();
            rows.add(row);
            --size;
        }
        return MySqlSourceITCase.convertRowDataToRowString(rows);
    }

    private String getTableNameRegex(String[] captureCustomerTables) {
        Preconditions.checkState((captureCustomerTables.length > 0 ? 1 : 0) != 0);
        if (captureCustomerTables.length == 1) {
            return captureCustomerTables[0];
        }
        return String.format("(%s)", StringUtils.join((Object[])captureCustomerTables, (String)"|"));
    }

    private String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        return serverId + "-" + (serverId + 4);
    }

    private void sleepMs(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
        try {
            connection.setAutoCommit(false);
            connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + tableId + " where id = 102", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId) throws SQLException {
        try {
            connection.setAutoCommit(false);
            connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO " + tableId + " VALUES(2001, 'user_22','Shanghai','123567891234'), (2002, 'user_23','Shanghai','123567891234'),(2003, 'user_24','Shanghai','123567891234')"});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    private MySqlConnection getConnection() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("database.hostname", MYSQL_CONTAINER.getHost());
        properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        properties.put("database.user", this.customDatabase.getUsername());
        properties.put("database.password", this.customDatabase.getPassword());
        properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
        Configuration configuration = Configuration.from(properties);
        return DebeziumUtils.createMySqlConnection((Configuration)configuration, (Properties)new Properties());
    }

    private String getTableId() {
        return this.customDatabase.getDatabaseName() + "." + this.tableName;
    }

    private void waitUntilJobRunning(TableResult tableResult) throws InterruptedException, ExecutionException {
        do {
            Thread.sleep(5000L);
        } while (((JobClient)tableResult.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean hasNextData(CloseableIterator<?> iterator) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            FutureTask<Object> future = new FutureTask<Object>(() -> iterator.hasNext());
            executor.execute(future);
            boolean bl = (Boolean)future.get(3L, TimeUnit.SECONDS);
            return bl;
        }
        catch (TimeoutException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            executor.shutdown();
        }
    }

    static class SleepingRowDataDebeziumDeserializeSchema
    implements DebeziumDeserializationSchema<RowData> {
        private static final long serialVersionUID = 1L;
        private final RowDataDebeziumDeserializeSchema deserializeSchema;
        private final long sleepMs;

        public SleepingRowDataDebeziumDeserializeSchema(RowDataDebeziumDeserializeSchema deserializeSchema, long sleepMs) {
            this.deserializeSchema = deserializeSchema;
            this.sleepMs = sleepMs;
        }

        public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
            this.deserializeSchema.deserialize(record, out);
            Thread.sleep(this.sleepMs);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.deserializeSchema.getProducedType();
        }
    }
}

