/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class NewlyAddedTableITCase
extends MySqlSourceTestBase {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)300L);
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final ScheduledExecutorService mockBinlogExecutor = Executors.newScheduledThreadPool(1);

    @Before
    public void before() throws SQLException {
        TestValuesTableFactory.clearAllData();
        this.customDatabase.createAndInitialize();
        try (MySqlConnection connection = this.getConnection();){
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + ".produce_binlog_table";
            connection.execute(new String[]{String.format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt BIGINT);", tableId)});
            connection.execute(new String[]{String.format("INSERT INTO  %s VALUES (0, 100), (1, 101), (2, 102);", tableId)});
            connection.commit();
            this.mockBinlogExecutor.schedule(() -> {
                try {
                    connection.execute(new String[]{String.format("UPDATE  %s SET  cnt = cnt +1 WHERE id < 2;", tableId)});
                    connection.commit();
                }
                catch (SQLException e) {
                    e.printStackTrace();
                }
            }, 500L, TimeUnit.MICROSECONDS);
        }
    }

    @After
    public void after() {
        this.mockBinlogExecutor.shutdown();
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineOnceWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwice() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlogAndAutoCloseReader() throws Exception {
        HashMap<String, String> otherOptions = new HashMap<String, String>();
        otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
        this.testNewlyAddedTableOneByOne(4, otherOptions, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineThriceWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTable() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
        this.testNewlyAddedTableOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForRemoveTableSingleParallelism() throws Exception {
        this.testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testJobManagerFailoverForRemoveTable() throws Exception {
        this.testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveTableSingleParallelism() throws Exception {
        this.testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveTable() throws Exception {
        this.testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveTableSingleParallelism() throws Exception {
        this.testRemoveTablesOneByOne(1, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveTable() throws Exception {
        this.testRemoveTablesOneByOne(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveAndAddNewTable() throws Exception {
        String tableId0 = this.customDatabase.getDatabaseName() + ".customers_even_dist";
        String tableId1 = "customers";
        String tableId2 = "customers_\\d+";
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        String finishedSavePointPath = null;
        CollectResultIterator<RowData> iterator = null;
        for (int i = 0; i < 2; ++i) {
            String changedTable = i == 0 ? tableId1 : "customers_1";
            StreamExecutionEnvironment env = this.getStreamExecutionEnvironment(finishedSavePointPath, 4);
            RowDataDebeziumDeserializeSchema deserializer = RowDataDebeziumDeserializeSchema.newBuilder().setMetadataConverters(new MetadataConverter[]{MySqlReadableMetadata.TABLE_NAME.getConverter()}).setPhysicalRowType((RowType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING())}).getLogicalType()).setResultTypeInfo((TypeInformation)InternalTypeInfo.of((LogicalType)TypeConversions.fromDataToLogicalType((DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)DataTypes.BIGINT()), DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"address", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"phone_number", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"_table_name", (DataType)((DataType)DataTypes.STRING().notNull()))})))).build();
            MySqlSource mySqlSource = MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).serverTimeZone("UTC").tableList(new String[]{tableId0, this.customDatabase.getDatabaseName() + "." + (i == 0 ? tableId1 : tableId2)}).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverId("5401-5404").deserializer((DebeziumDeserializationSchema)deserializer).scanNewlyAddedTableEnabled(true).build();
            DataStreamSource source = env.fromSource((Source)mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source" + i);
            if (iterator == null) {
                iterator = this.addCollectSink((DataStream<RowData>)source);
            } else {
                this.addCollectSink((DataStream<RowData>)source);
            }
            JobClient jobClient = env.executeAsync("Collect " + i);
            iterator.setJobClient(jobClient);
            List<String> expectedCustomersEvenDistResult = Arrays.asList("+I[103, user_3, Shanghai, 123567891234, customers_even_dist]", "+I[104, user_4, Shanghai, 123567891234, customers_even_dist]", "+I[101, user_1, Shanghai, 123567891234, customers_even_dist]", "+I[102, user_2, Shanghai, 123567891234, customers_even_dist]", "+I[107, user_7, Shanghai, 123567891234, customers_even_dist]", "+I[108, user_8, Shanghai, 123567891234, customers_even_dist]", "+I[105, user_5, Shanghai, 123567891234, customers_even_dist]", "+I[106, user_6, Shanghai, 123567891234, customers_even_dist]", "+I[109, user_9, Shanghai, 123567891234, customers_even_dist]", "+I[110, user_10, Shanghai, 123567891234, customers_even_dist]");
            List<String> expectedCustomersResult = Arrays.asList(String.format("+I[1011, user_12, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1012, user_13, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1009, user_10, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1010, user_11, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1015, user_16, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1016, user_17, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1013, user_14, Shanghai, 123567891234, %s]", changedTable), String.format("+I[118, user_7, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1014, user_15, Shanghai, 123567891234, %s]", changedTable), String.format("+I[111, user_6, Shanghai, 123567891234, %s]", changedTable), String.format("+I[2000, user_21, Shanghai, 123567891234, %s]", changedTable), String.format("+I[109, user_4, Shanghai, 123567891234, %s]", changedTable), String.format("+I[110, user_5, Shanghai, 123567891234, %s]", changedTable), String.format("+I[103, user_3, Shanghai, 123567891234, %s]", changedTable), String.format("+I[101, user_1, Shanghai, 123567891234, %s]", changedTable), String.format("+I[102, user_2, Shanghai, 123567891234, %s]", changedTable), String.format("+I[123, user_9, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1019, user_20, Shanghai, 123567891234, %s]", changedTable), String.format("+I[121, user_8, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1017, user_18, Shanghai, 123567891234, %s]", changedTable), String.format("+I[1018, user_19, Shanghai, 123567891234, %s]", changedTable));
            List<String> expectedBinlogResult = Arrays.asList(String.format("-U[103, user_3, Shanghai, 123567891234, %s]", changedTable), String.format("+U[103, user_3, Update1, 123567891234, %s]", changedTable), String.format("-D[102, user_2, Shanghai, 123567891234, %s]", changedTable), String.format("+I[102, user_2, Insert1, 123567891234, %s]", changedTable), String.format("-U[103, user_3, Update1, 123567891234, %s]", changedTable), String.format("+U[103, user_3, Update2, 123567891234, %s]", changedTable));
            List<String> expectedSnapshotResult = i == 0 ? Stream.concat(expectedCustomersEvenDistResult.stream(), expectedCustomersResult.stream()).collect(Collectors.toList()) : expectedCustomersResult;
            List<String> rows = this.fetchRowData((Iterator<RowData>)iterator, expectedSnapshotResult.size());
            NewlyAddedTableITCase.assertEqualsInAnyOrder(expectedSnapshotResult, rows);
            try (MySqlConnection connection = this.getConnection();){
                connection.setAutoCommit(false);
                String tableId = this.customDatabase.getDatabaseName() + "." + changedTable;
                connection.execute(new String[]{"UPDATE " + tableId + " SET address = 'Update1' where id = 103", "DELETE FROM " + tableId + " where id = 102", "INSERT INTO " + tableId + " VALUES(102, 'user_2','Insert1','123567891234')", "UPDATE " + tableId + " SET address = 'Update2' where id = 103"});
                connection.commit();
            }
            rows = this.fetchRowData((Iterator<RowData>)iterator, expectedBinlogResult.size());
            NewlyAddedTableITCase.assertEqualsInAnyOrder(expectedBinlogResult, rows);
            finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
            jobClient.cancel().get();
        }
        temporaryFolder.delete();
    }

    protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) {
        TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator();
        CollectStreamSink sink = new CollectStreamSink(stream, factory);
        sink.name("Data stream collect sink");
        stream.getExecutionEnvironment().addOperator(sink.getTransformation());
        CollectResultIterator iterator = new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName, stream.getExecutionEnvironment().getCheckpointConfig());
        return iterator;
    }

    private List<String> fetchRowData(Iterator<RowData> iter, int size) {
        ArrayList<RowData> rows = new ArrayList<RowData>(size);
        while (size > 0 && iter.hasNext()) {
            RowData row = iter.next();
            rows.add(row);
            --size;
        }
        return NewlyAddedTableITCase.convertRowDataToRowString(rows);
    }

    private static List<String> convertRowDataToRowString(List<RowData> rows) {
        LinkedHashMap<String, Integer> map = new LinkedHashMap<String, Integer>();
        map.put("id", 0);
        map.put("name", 1);
        map.put("address", 2);
        map.put("phone_number", 3);
        map.put("_table_name", 4);
        return rows.stream().map(row -> RowUtils.createRowWithNamedPositions((RowKind)row.getRowKind(), (Object[])new Object[]{row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getString(4)}, (LinkedHashMap)map).toString()).collect(Collectors.toList());
    }

    private void testRemoveTablesOneByOne(int parallelism, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String ... captureAddressTables) throws Exception {
        this.initialAddressTables((JdbcConnection)this.getConnection(), captureAddressTables);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        ArrayList<String> fetchedDataList = new ArrayList<String>();
        for (String table : captureAddressTables) {
            String cityName = table.split("_")[1];
            fetchedDataList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", table, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", table, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", table, cityName, cityName)));
        }
        String finishedSavePointPath = null;
        StreamExecutionEnvironment env = this.getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        String createTableStatement = this.getCreateTableStatement(new HashMap<String, String>(), captureAddressTables);
        tEnv.executeSql(createTableStatement);
        tEnv.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
        JobClient jobClient = (JobClient)tableResult.getJobClient().get();
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT) {
            NewlyAddedTableITCase.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        NewlyAddedTableITCase.waitForSinkSize("sink", fetchedDataList.size());
        NewlyAddedTableITCase.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
        finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
        jobClient.cancel().get();
        for (int round = 0; round < captureAddressTables.length - 1; ++round) {
            String[] captureTablesThisRound = Arrays.asList(captureAddressTables).subList(round + 1, captureAddressTables.length).toArray(new String[0]);
            StreamExecutionEnvironment env2 = this.getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv2 = StreamTableEnvironment.create((StreamExecutionEnvironment)env2);
            String createTableStatement2 = this.getCreateTableStatement(new HashMap<String, String>(), captureTablesThisRound);
            tEnv2.executeSql(createTableStatement2);
            tEnv2.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            TableResult tableResult2 = tEnv2.executeSql("insert into sink select * from address");
            JobClient jobClient2 = (JobClient)tableResult2.getJobClient().get();
            NewlyAddedTableITCase.waitForSinkSize("sink", fetchedDataList.size());
            NewlyAddedTableITCase.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            ArrayList<String> expectedBinlogDataThisRound = new ArrayList<String>();
            int captureAddressTablesLength = captureAddressTables.length;
            for (int i = 0; i < captureAddressTablesLength; ++i) {
                String tableName = captureAddressTables[i];
                this.makeBinlogForAddressTable((JdbcConnection)this.getConnection(), tableName, round);
                if (i <= round) continue;
                String cityName = tableName.split("_")[1];
                expectedBinlogDataThisRound.addAll(Arrays.asList(String.format("+U[%s, 416874195632735147, CHINA_%s, %s, %s West Town address 1]", tableName, round, cityName, cityName), String.format("+I[%s, %d, China, %s, %s West Town address 4]", tableName, 417022095255614380L + (long)round, cityName, cityName)));
            }
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG && TestValuesTableFactory.getRawResults((String)"sink").size() > fetchedDataList.size()) {
                NewlyAddedTableITCase.triggerFailover(failoverType, jobClient2.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            fetchedDataList.addAll(expectedBinlogDataThisRound);
            NewlyAddedTableITCase.waitForSinkSize("sink", fetchedDataList.size());
            NewlyAddedTableITCase.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            finishedSavePointPath = this.triggerSavepointWithRetry(jobClient2, savepointDirectory);
            jobClient2.cancel().get();
        }
    }

    private void testNewlyAddedTableOneByOne(int parallelism, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, boolean makeBinlogBeforeCapture, String ... captureAddressTables) throws Exception {
        this.testNewlyAddedTableOneByOne(parallelism, new HashMap<String, String>(), failoverType, failoverPhase, makeBinlogBeforeCapture, captureAddressTables);
    }

    private void testNewlyAddedTableOneByOne(int parallelism, Map<String, String> sourceOptions, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, boolean makeBinlogBeforeCapture, String ... captureAddressTables) throws Exception {
        this.initialAddressTables((JdbcConnection)this.getConnection(), captureAddressTables);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        String finishedSavePointPath = null;
        List<String> fetchedDataList = new ArrayList<String>();
        for (int round = 0; round < captureAddressTables.length; ++round) {
            String[] captureTablesThisRound = Arrays.asList(captureAddressTables).subList(0, round + 1).toArray(new String[0]);
            String newlyAddedTable = captureAddressTables[round];
            if (makeBinlogBeforeCapture) {
                this.makeBinlogBeforeCaptureForAddressTable((JdbcConnection)this.getConnection(), newlyAddedTable);
            }
            StreamExecutionEnvironment env = this.getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
            String createTableStatement = this.getCreateTableStatement(sourceOptions, captureTablesThisRound);
            tEnv.executeSql(createTableStatement);
            tEnv.executeSql("CREATE TABLE sink ( table_name STRING, id BIGINT, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
            JobClient jobClient = (JobClient)tableResult.getJobClient().get();
            String cityName = newlyAddedTable.split("_")[1];
            List<String> expectedSnapshotDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", newlyAddedTable, cityName, cityName));
            if (makeBinlogBeforeCapture) {
                expectedSnapshotDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 417022095255614381, China, %s, %s West Town address 5]", newlyAddedTable, cityName, cityName));
            }
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT) {
                NewlyAddedTableITCase.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            fetchedDataList.addAll(expectedSnapshotDataThisRound);
            NewlyAddedTableITCase.waitForUpsertSinkSize("sink", fetchedDataList.size());
            NewlyAddedTableITCase.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults((String)"sink"));
            this.makeFirstPartBinlogForAddressTable((JdbcConnection)this.getConnection(), newlyAddedTable);
            if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG) {
                NewlyAddedTableITCase.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            this.makeSecondPartBinlogForAddressTable((JdbcConnection)this.getConnection(), newlyAddedTable);
            fetchedDataList = fetchedDataList.stream().filter(r -> !r.contains(String.format("%s, 416874195632735147", newlyAddedTable))).collect(Collectors.toList());
            List<String> expectedBinlogUpsertDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", newlyAddedTable, cityName, cityName), String.format("+I[%s, 417022095255614380, China, %s, %s West Town address 4]", newlyAddedTable, cityName, cityName));
            fetchedDataList.addAll(expectedBinlogUpsertDataThisRound);
            NewlyAddedTableITCase.waitForUpsertSinkSize("sink", fetchedDataList.size());
            Thread.sleep(1000L);
            NewlyAddedTableITCase.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults((String)"sink"));
            if (round != captureAddressTables.length - 1) {
                finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
            }
            jobClient.cancel().get();
        }
    }

    private String getCreateTableStatement(Map<String, String> otherOptions, String ... captureTableNames) {
        return String.format("CREATE TABLE address ( table_name STRING METADATA VIRTUAL, id BIGINT NOT NULL, country STRING, city STRING, detail_address STRING, primary key (city, id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.chunk.size' = '2', 'chunk-meta.group.size' = '2', 'server-time-zone' = 'UTC', 'server-id' = '%s', 'scan.newly-added-table.enabled' = 'true' %s)", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), this.customDatabase.getUsername(), this.customDatabase.getPassword(), this.customDatabase.getDatabaseName(), this.getTableNameRegex(captureTableNames), this.getServerId(), otherOptions.isEmpty() ? "" : "," + otherOptions.entrySet().stream().map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironment(String finishedSavePointPath, int parallelism) throws Exception {
        Configuration configuration = new Configuration();
        if (finishedSavePointPath != null) {
            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(parallelism);
        env.enableCheckpointing(200L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)100L));
        return env;
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) throws ExecutionException, InterruptedException {
        for (int retryTimes = 0; retryTimes < 600; ++retryTimes) {
            try {
                return (String)jobClient.triggerSavepoint(savepointDirectory).get();
            }
            catch (Exception e) {
                Optional exception = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (exception.isPresent() && ((CheckpointException)exception.get()).getMessage().contains("Checkpoint triggering task")) {
                    Thread.sleep(100L);
                    continue;
                }
                throw e;
            }
        }
        return null;
    }

    private String getTableNameRegex(String[] captureCustomerTables) {
        Preconditions.checkState((captureCustomerTables.length > 0 ? 1 : 0) != 0);
        if (captureCustomerTables.length == 1) {
            return captureCustomerTables[0];
        }
        return String.format("(%s)", StringUtils.join((Object[])captureCustomerTables, (String)"|"));
    }

    private String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        return serverId + "-" + (serverId + 4);
    }

    private void sleepMs(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialAddressTables(JdbcConnection connection, String[] addressTables) throws SQLException {
        try {
            connection.setAutoCommit(false);
            for (String tableName : addressTables) {
                String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
                String cityName = tableName.split("_")[1];
                connection.execute(new String[]{"CREATE TABLE " + tableId + "(  id BIGINT UNSIGNED NOT NULL PRIMARY KEY,  country VARCHAR(255) NOT NULL,  city VARCHAR(255) NOT NULL,  detail_address VARCHAR(1024));"});
                connection.execute(new String[]{String.format("INSERT INTO  %s VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),       (416927583791428523, 'China', '%s', '%s West Town address 2'),       (417022095255614379, 'China', '%s', '%s West Town address 3');", tableId, cityName, cityName, cityName, cityName, cityName, cityName)});
            }
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void makeFirstPartBinlogForAddressTable(JdbcConnection connection, String tableName) throws SQLException {
        try {
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
            connection.execute(new String[]{String.format("UPDATE %s SET COUNTRY = 'CHINA' where id = 416874195632735147", tableId)});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void makeSecondPartBinlogForAddressTable(JdbcConnection connection, String tableName) throws SQLException {
        try {
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
            String cityName = tableName.split("_")[1];
            connection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614380, 'China','%s','%s West Town address 4')", tableId, cityName, cityName)});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void makeBinlogBeforeCaptureForAddressTable(JdbcConnection connection, String tableName) throws SQLException {
        try {
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
            String cityName = tableName.split("_")[1];
            connection.execute(new String[]{String.format("INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')", tableId, cityName, cityName)});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    private MySqlConnection getConnection() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("database.hostname", MYSQL_CONTAINER.getHost());
        properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        properties.put("database.user", this.customDatabase.getUsername());
        properties.put("database.password", this.customDatabase.getPassword());
        properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
        io.debezium.config.Configuration configuration = io.debezium.config.Configuration.from(properties);
        return DebeziumUtils.createMySqlConnection((io.debezium.config.Configuration)configuration, (Properties)new Properties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void makeBinlogForAddressTable(JdbcConnection connection, String tableName, int round) throws SQLException {
        try {
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
            String cityName = tableName.split("_")[1];
            connection.execute(new String[]{String.format("UPDATE %s SET COUNTRY = 'CHINA_%s' where id = 416874195632735147", tableId, round)});
            connection.execute(new String[]{String.format("INSERT INTO %s VALUES(%d, 'China','%s','%s West Town address 4')", tableId, 417022095255614380L + (long)round, cityName, cityName)});
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    private static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (NewlyAddedTableITCase.sinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int sinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getRawResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }
}

