/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MysqlPipelineNewlyAddedTableITCase
extends MySqlSourceTestBase {
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final ScheduledExecutorService mockBinlogExecutor = Executors.newScheduledThreadPool(1);

    @Before
    public void before() throws SQLException {
        TestValuesTableFactory.clearAllData();
        this.customDatabase.createAndInitialize();
        try (MySqlConnection connection = this.getConnection();){
            connection.setAutoCommit(false);
            String tableId = this.customDatabase.getDatabaseName() + ".produce_binlog_table";
            connection.execute(new String[]{String.format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt BIGINT);", tableId)});
            connection.execute(new String[]{String.format("INSERT INTO  %s VALUES (0, 100), (1, 101), (2, 102);", tableId)});
            connection.commit();
            this.mockBinlogExecutor.schedule(() -> {
                try {
                    connection.execute(new String[]{String.format("UPDATE  %s SET  cnt = cnt +1 WHERE id < 2;", tableId)});
                    connection.commit();
                }
                catch (SQLException e) {
                    e.printStackTrace();
                }
            }, 500L, TimeUnit.MICROSECONDS);
        }
    }

    @After
    public void after() {
        this.mockBinlogExecutor.shutdown();
    }

    private MySqlConnection getConnection() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("database.hostname", MYSQL_CONTAINER.getHost());
        properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        properties.put("database.user", this.customDatabase.getUsername());
        properties.put("database.password", this.customDatabase.getPassword());
        properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
        Configuration configuration = Configuration.from(properties);
        return DebeziumUtils.createMySqlConnection((Configuration)configuration, (Properties)new Properties());
    }

    @Test
    public void testAddNewTableOneByOneSingleParallelism() throws Exception {
        TestParam testParam = TestParam.newBuilder(Collections.singletonList("address_hangzhou"), 4, Arrays.asList("address_hangzhou", "address_beijing"), 4).setFirstRoundInitTables(Arrays.asList("address_hangzhou", "address_beijing")).build();
        this.testAddNewTable(testParam, 1);
    }

    @Test
    public void testAddNewTableOneByOne() throws Exception {
        TestParam testParam = TestParam.newBuilder(Collections.singletonList("address_hangzhou"), 4, Arrays.asList("address_hangzhou", "address_beijing"), 4).setFirstRoundInitTables(Arrays.asList("address_hangzhou", "address_beijing")).build();
        this.testAddNewTable(testParam, 4);
    }

    @Test
    public void testAddNewTableByPatternSingleParallelism() throws Exception {
        TestParam testParam = TestParam.newBuilder(Collections.singletonList("address_\\.*"), 8, Collections.singletonList("address_\\.*"), 8).setFirstRoundInitTables(Arrays.asList("address_hangzhou", "address_beijing")).setSecondRoundInitTables(Arrays.asList("address_shanghai", "address_suzhou")).build();
        this.testAddNewTable(testParam, 1);
    }

    @Test
    public void testAddNewTableByPattern() throws Exception {
        TestParam testParam = TestParam.newBuilder(Collections.singletonList("address_\\.*"), 8, Collections.singletonList("address_\\.*"), 12).setFirstRoundInitTables(Arrays.asList("address_hangzhou", "address_beijing")).setSecondRoundInitTables(Arrays.asList("address_shanghai", "address_suzhou", "address_shenzhen")).build();
        this.testAddNewTable(testParam, 4);
    }

    private void testAddNewTable(TestParam testParam, int parallelism) throws Exception {
        if (CollectionUtils.isNotEmpty(testParam.getFirstRoundInitTables())) {
            this.initialAddressTables((JdbcConnection)this.getConnection(), testParam.getFirstRoundInitTables());
        }
        Path savepointDir = Files.createTempDirectory("add-new-table-test", new FileAttribute[0]);
        String savepointDirectory = savepointDir.toAbsolutePath().toString();
        String finishedSavePointPath = null;
        StreamExecutionEnvironment env = this.getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
        List<String> listenTablesFirstRound = testParam.getFirstRoundListenTables();
        FlinkSourceProvider sourceProvider = this.getFlinkSourceProvider(listenTablesFirstRound, parallelism);
        DataStreamSource source = env.fromSource(sourceProvider.getSource(), WatermarkStrategy.noWatermarks(), "mysql", (TypeInformation)new EventTypeInfo());
        TypeSerializer serializer = source.getTransformation().getOutputType().createSerializer(env.getConfig());
        CheckpointedCollectResultBuffer resultBuffer = new CheckpointedCollectResultBuffer(serializer);
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
        CollectResultIterator iterator = this.addCollector(env, (DataStreamSource)source, (AbstractCollectResultBuffer)resultBuffer, (TypeSerializer)serializer, accumulatorName);
        JobClient jobClient = env.executeAsync("beforeAddNewTable");
        iterator.setJobClient(jobClient);
        List<Event> actual = MySqSourceTestUtils.fetchResults(iterator, testParam.getFirstRoundFetchSize());
        Optional<String> listenByPattern = listenTablesFirstRound.stream().filter(table -> StringUtils.contains((CharSequence)table, (CharSequence)"\\.*")).findAny();
        this.multiAssert(actual, listenByPattern.isPresent() ? testParam.getFirstRoundInitTables() : listenTablesFirstRound);
        if (CollectionUtils.isNotEmpty(testParam.getSecondRoundInitTables())) {
            this.initialAddressTables((JdbcConnection)this.getConnection(), testParam.getSecondRoundInitTables());
        }
        finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
        jobClient.cancel().get();
        iterator.close();
        StreamExecutionEnvironment restoredEnv = this.getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
        List<String> listenTablesSecondRound = testParam.getSecondRoundListenTables();
        FlinkSourceProvider restoredSourceProvider = this.getFlinkSourceProvider(listenTablesSecondRound, parallelism);
        DataStreamSource restoreSource = restoredEnv.fromSource(restoredSourceProvider.getSource(), WatermarkStrategy.noWatermarks(), "mysql", (TypeInformation)new EventTypeInfo());
        CollectResultIterator restoredIterator = this.addCollector(restoredEnv, (DataStreamSource)restoreSource, (AbstractCollectResultBuffer)resultBuffer, (TypeSerializer)serializer, accumulatorName);
        JobClient restoreClient = restoredEnv.executeAsync("AfterAddNewTable");
        List<String> newlyAddTables = listenTablesSecondRound.stream().filter(table -> !listenTablesFirstRound.contains(table)).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(newlyAddTables)) {
            newlyAddTables = testParam.getSecondRoundInitTables();
        }
        List<Event> newlyTableEvent = MySqSourceTestUtils.fetchResults(restoredIterator, testParam.getSecondRoundFetchSize());
        this.multiAssert(newlyTableEvent, newlyAddTables);
        restoreClient.cancel().get();
        restoredIterator.close();
    }

    private void multiAssert(List<Event> actualEvents, List<String> listenTables) {
        ArrayList<CreateTableEvent> expectedCreateTableEvents = new ArrayList<CreateTableEvent>();
        ArrayList<Event> expectedDataChangeEvents = new ArrayList<Event>();
        for (String table : listenTables) {
            expectedCreateTableEvents.add(this.getCreateTableEvent(TableId.tableId((String)this.customDatabase.getDatabaseName(), (String)table)));
            expectedDataChangeEvents.addAll(this.getSnapshotExpected(TableId.tableId((String)this.customDatabase.getDatabaseName(), (String)table)));
        }
        List actualCreateTableEvents = actualEvents.stream().filter(event -> event instanceof CreateTableEvent).collect(Collectors.toList());
        Assertions.assertThat(actualCreateTableEvents).containsExactlyInAnyOrder((Object[])expectedCreateTableEvents.toArray(new Event[0]));
        List actualDataChangeEvents = actualEvents.stream().filter(event -> event instanceof DataChangeEvent).collect(Collectors.toList());
        Assertions.assertThat(actualDataChangeEvents).containsExactlyInAnyOrder((Object[])expectedDataChangeEvents.toArray(new Event[0]));
    }

    private CreateTableEvent getCreateTableEvent(TableId tableId) {
        Schema schema = Schema.newBuilder().physicalColumn("id", DataTypes.BIGINT().notNull()).physicalColumn("country", DataTypes.VARCHAR((int)255).notNull()).physicalColumn("city", DataTypes.VARCHAR((int)255).notNull()).physicalColumn("detail_address", DataTypes.VARCHAR((int)1024)).primaryKey(Collections.singletonList("id")).build();
        return new CreateTableEvent(tableId, schema);
    }

    private List<Event> getSnapshotExpected(TableId tableId) {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.BIGINT().notNull(), DataTypes.VARCHAR((int)255).notNull(), DataTypes.VARCHAR((int)255).notNull(), DataTypes.VARCHAR((int)1024)}, (String[])new String[]{"id", "country", "city", "detail_address"});
        BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
        String cityName = tableId.getTableName().split("_")[1];
        return Arrays.asList(DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{416874195632735147L, BinaryStringData.fromString((String)"China"), BinaryStringData.fromString((String)cityName), BinaryStringData.fromString((String)(cityName + " West Town address 1"))})), DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{416927583791428523L, BinaryStringData.fromString((String)"China"), BinaryStringData.fromString((String)cityName), BinaryStringData.fromString((String)(cityName + " West Town address 2"))})), DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{417022095255614379L, BinaryStringData.fromString((String)"China"), BinaryStringData.fromString((String)cityName), BinaryStringData.fromString((String)(cityName + " West Town address 3"))})));
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) throws ExecutionException, InterruptedException {
        for (int retryTimes = 0; retryTimes < 600; ++retryTimes) {
            try {
                return (String)jobClient.triggerSavepoint(savepointDirectory).get();
            }
            catch (Exception e) {
                Optional exception = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (exception.isPresent() && ((CheckpointException)exception.get()).getMessage().contains("Checkpoint triggering task")) {
                    Thread.sleep(100L);
                    continue;
                }
                throw e;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initialAddressTables(JdbcConnection connection, List<String> addressTables) throws SQLException {
        try {
            connection.setAutoCommit(false);
            for (String tableName : addressTables) {
                String tableId = this.customDatabase.getDatabaseName() + "." + tableName;
                String cityName = tableName.split("_")[1];
                connection.execute(new String[]{"CREATE TABLE IF NOT EXISTS " + tableId + "(  id BIGINT NOT NULL PRIMARY KEY,  country VARCHAR(255) NOT NULL,  city VARCHAR(255) NOT NULL,  detail_address VARCHAR(1024));"});
                connection.execute(new String[]{String.format("INSERT INTO  %s VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),       (416927583791428523, 'China', '%s', '%s West Town address 2'),       (417022095255614379, 'China', '%s', '%s West Town address 3');", tableId, cityName, cityName, cityName, cityName, cityName, cityName)});
            }
            connection.commit();
        }
        finally {
            connection.close();
        }
    }

    private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int parallelism) {
        List fullTableNames = tables.stream().map(table -> this.customDatabase.getDatabaseName() + "." + table).collect(Collectors.toList());
        HashMap<String, String> options = new HashMap<String, String>();
        options.put(MySqlDataSourceOptions.HOSTNAME.key(), MYSQL_CONTAINER.getHost());
        options.put(MySqlDataSourceOptions.PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        options.put(MySqlDataSourceOptions.USERNAME.key(), "mysqluser");
        options.put(MySqlDataSourceOptions.PASSWORD.key(), "mysqlpw");
        options.put(MySqlDataSourceOptions.SERVER_TIME_ZONE.key(), "UTC");
        options.put(MySqlDataSourceOptions.TABLES.key(), StringUtils.join(fullTableNames, (String)","));
        options.put(MySqlDataSourceOptions.SERVER_ID.key(), MySqSourceTestUtils.getServerId(parallelism));
        options.put(MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
        FactoryHelper.DefaultContext context = new FactoryHelper.DefaultContext(org.apache.flink.cdc.common.configuration.Configuration.fromMap(options), null, ((Object)((Object)this)).getClass().getClassLoader());
        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
        MySqlDataSource dataSource = (MySqlDataSource)factory.createDataSource((Factory.Context)context);
        return (FlinkSourceProvider)dataSource.getEventSourceProvider();
    }

    private <T> CollectResultIterator<T> addCollector(StreamExecutionEnvironment env, DataStreamSource<T> source, AbstractCollectResultBuffer<T> buffer, TypeSerializer<T> serializer, String accumulatorName) {
        CollectSinkOperatorFactory sinkFactory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)sinkFactory.getOperator();
        CollectResultIterator iterator = new CollectResultIterator(buffer, operator.getOperatorIdFuture(), accumulatorName, 0);
        CollectStreamSink sink = new CollectStreamSink(source, sinkFactory);
        sink.name("Data stream collect sink");
        env.addOperator(sink.getTransformation());
        env.registerCollectIterator(iterator);
        return iterator;
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironment(String finishedSavePointPath, int parallelism) {
        org.apache.flink.configuration.Configuration configuration = new org.apache.flink.configuration.Configuration();
        if (finishedSavePointPath != null) {
            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((org.apache.flink.configuration.Configuration)configuration);
        env.setParallelism(parallelism);
        env.enableCheckpointing(500L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)1000L));
        return env;
    }

    private static class TestParam {
        private final List<String> firstRoundInitTables;
        private final List<String> firstRoundListenTables;
        private final Integer firstRoundFetchSize;
        private final List<String> secondRoundInitTables;
        private final List<String> secondRoundListenTables;
        private final Integer secondRoundFetchSize;

        private TestParam(Builder builder) {
            this.firstRoundInitTables = builder.firstRoundInitTables;
            this.firstRoundListenTables = builder.firstRoundListenTables;
            this.firstRoundFetchSize = builder.firstRoundFetchSize;
            this.secondRoundInitTables = builder.secondRoundInitTables;
            this.secondRoundListenTables = builder.secondRoundListenTables;
            this.secondRoundFetchSize = builder.secondRoundFetchSize;
        }

        public static Builder newBuilder(List<String> firstRoundListenTables, Integer firstRoundFetchSize, List<String> secondRoundListenTables, Integer secondRoundFetchSize) {
            return new Builder(firstRoundListenTables, firstRoundFetchSize, secondRoundListenTables, secondRoundFetchSize);
        }

        public List<String> getFirstRoundInitTables() {
            return this.firstRoundInitTables;
        }

        public List<String> getFirstRoundListenTables() {
            return this.firstRoundListenTables;
        }

        public Integer getFirstRoundFetchSize() {
            return this.firstRoundFetchSize;
        }

        public List<String> getSecondRoundInitTables() {
            return this.secondRoundInitTables;
        }

        public List<String> getSecondRoundListenTables() {
            return this.secondRoundListenTables;
        }

        public Integer getSecondRoundFetchSize() {
            return this.secondRoundFetchSize;
        }

        public static class Builder {
            private List<String> firstRoundInitTables;
            private final List<String> firstRoundListenTables;
            private final Integer firstRoundFetchSize;
            private List<String> secondRoundInitTables;
            private final List<String> secondRoundListenTables;
            private final Integer secondRoundFetchSize;

            public Builder(List<String> firstRoundListenTables, Integer firstRoundFetchSize, List<String> secondRoundListenTables, Integer secondRoundFetchSize) {
                this.firstRoundListenTables = firstRoundListenTables;
                this.firstRoundFetchSize = firstRoundFetchSize;
                this.secondRoundListenTables = secondRoundListenTables;
                this.secondRoundFetchSize = secondRoundFetchSize;
            }

            public TestParam build() {
                return new TestParam(this);
            }

            public Builder setFirstRoundInitTables(List<String> firstRoundInitTables) {
                this.firstRoundInitTables = firstRoundInitTables;
                return this;
            }

            public Builder setSecondRoundInitTables(List<String> secondRoundInitTables) {
                this.secondRoundInitTables = secondRoundInitTables;
                return this;
            }
        }
    }
}

