/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

public class MySqlPipelineITCase
extends MySqlSourceTestBase {
    protected static final MySqlContainer MYSQL8_CONTAINER = MySqlPipelineITCase.createMySqlContainer((MySqlVersion)MySqlVersion.V8_0);
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(4);
        this.env.enableCheckpointing(2000L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Test
    public void testInitialStartupMode() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory().hostname(MYSQL8_CONTAINER.getHost()).port(MYSQL8_CONTAINER.getDatabasePort()).username("mysqluser").password("mysqlpw").databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.inventoryDatabase.getDatabaseName() + "\\.products"}).startupOptions(StartupOptions.initial()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism())).serverTimeZone("UTC").includeSchemaChanges(((Boolean)MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED.defaultValue()).booleanValue());
        FlinkSourceProvider sourceProvider = (FlinkSourceProvider)new MySqlDataSource(configFactory).getEventSourceProvider();
        CloseableIterator events = this.env.fromSource(sourceProvider.getSource(), WatermarkStrategy.noWatermarks(), "mysql", (TypeInformation)new EventTypeInfo()).executeAndCollect();
        Thread.sleep(10000L);
        TableId tableId = TableId.tableId((String)this.inventoryDatabase.getDatabaseName(), (String)"products");
        CreateTableEvent createTableEvent = this.getProductsCreateTableEvent(tableId);
        List<Event> expectedSnapshot = this.getSnapshotExpected(tableId);
        ArrayList<Object> expectedBinlog = new ArrayList<Object>();
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            expectedBinlog.addAll(this.executeAlterAndProvideExpected(tableId, statement));
            RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)255).notNull(), DataTypes.FLOAT(), DataTypes.VARCHAR((int)45), DataTypes.VARCHAR((int)55)}, (String[])new String[]{"id", "name", "weight", "col1", "col2"});
            BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
            statement.execute(String.format("INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');", this.inventoryDatabase.getDatabaseName()));
            expectedBinlog.add(DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{110, BinaryStringData.fromString((String)"scooter"), Float.valueOf(5.5f), BinaryStringData.fromString((String)"c-10"), BinaryStringData.fromString((String)"c-20")})));
            statement.execute(String.format("INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');", this.inventoryDatabase.getDatabaseName()));
            expectedBinlog.add(DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{111, BinaryStringData.fromString((String)"football"), Float.valueOf(6.6f), BinaryStringData.fromString((String)"c-11"), BinaryStringData.fromString((String)"c-21")})));
            statement.execute(String.format("UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;", this.inventoryDatabase.getDatabaseName()));
            expectedBinlog.add(DataChangeEvent.updateEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{110, BinaryStringData.fromString((String)"scooter"), Float.valueOf(5.5f), BinaryStringData.fromString((String)"c-10"), BinaryStringData.fromString((String)"c-20")}), (RecordData)generator.generate(new Object[]{110, BinaryStringData.fromString((String)"scooter"), Float.valueOf(5.5f), BinaryStringData.fromString((String)"c-12"), BinaryStringData.fromString((String)"c-22")})));
            statement.execute(String.format("DELETE FROM `%s`.`products` WHERE `id` = 111;", this.inventoryDatabase.getDatabaseName()));
            expectedBinlog.add(DataChangeEvent.deleteEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{111, BinaryStringData.fromString((String)"football"), Float.valueOf(6.6f), BinaryStringData.fromString((String)"c-11"), BinaryStringData.fromString((String)"c-21")})));
        }
        List actual = MySqSourceTestUtils.fetchResults(events, 1 + expectedSnapshot.size() + expectedBinlog.size());
        Assertions.assertThat(actual.get(0)).isEqualTo((Object)createTableEvent);
        Assertions.assertThat(actual.subList(1, 10)).containsExactlyInAnyOrder((Object[])expectedSnapshot.toArray(new Event[0]));
        Assertions.assertThat(actual.subList(10, actual.size())).isEqualTo(expectedBinlog);
    }

    @Test
    public void testParseAlterStatement() throws Exception {
        this.env.setParallelism(1);
        this.inventoryDatabase.createAndInitialize();
        MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory().hostname(MYSQL8_CONTAINER.getHost()).port(MYSQL8_CONTAINER.getDatabasePort()).username("mysqluser").password("mysqlpw").databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.inventoryDatabase.getDatabaseName() + "\\.products"}).startupOptions(StartupOptions.latest()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism())).serverTimeZone("UTC").includeSchemaChanges(((Boolean)MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED.defaultValue()).booleanValue());
        FlinkSourceProvider sourceProvider = (FlinkSourceProvider)new MySqlDataSource(configFactory).getEventSourceProvider();
        CloseableIterator events = this.env.fromSource(sourceProvider.getSource(), WatermarkStrategy.noWatermarks(), "mysql", (TypeInformation)new EventTypeInfo()).executeAndCollect();
        Thread.sleep(5000L);
        TableId tableId = TableId.tableId((String)this.inventoryDatabase.getDatabaseName(), (String)"products");
        ArrayList<Object> expected = new ArrayList<Object>();
        expected.add(this.getProductsCreateTableEvent(tableId));
        try (Connection connection = this.inventoryDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            expected.addAll(this.executeAlterAndProvideExpected(tableId, statement));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN (`cols1` VARCHAR(45), `cols2` VARCHAR(55));", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Arrays.asList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols1", (DataType)DataTypes.VARCHAR((int)45))), new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols2", (DataType)DataTypes.VARCHAR((int)55))))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN (`cols3` VARCHAR(45), `cols4` VARCHAR(55));", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Arrays.asList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols3", (DataType)DataTypes.VARCHAR((int)45))), new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols4", (DataType)DataTypes.VARCHAR((int)55))))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols5` BIT NULL;", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols5", (DataType)DataTypes.BOOLEAN())))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols6", (DataType)BinaryType.ofEmptyLiteral())))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols7", (DataType)DataTypes.BINARY((int)1))))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols8", (DataType)CharType.ofEmptyLiteral())))));
            statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;", this.inventoryDatabase.getDatabaseName()));
            expected.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"cols9", (DataType)DataTypes.CHAR((int)1))))));
        }
        List actual = MySqSourceTestUtils.fetchResults(events, expected.size());
        Assertions.assertThat(actual).isEqualTo(expected);
    }

    private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
        return new CreateTableEvent(tableId, Schema.newBuilder().physicalColumn("id", DataTypes.INT().notNull()).physicalColumn("name", DataTypes.VARCHAR((int)255).notNull()).physicalColumn("description", DataTypes.VARCHAR((int)512)).physicalColumn("weight", (DataType)DataTypes.FLOAT()).primaryKey(Collections.singletonList("id")).build());
    }

    private List<Event> getSnapshotExpected(TableId tableId) {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR((int)255).notNull(), DataTypes.VARCHAR((int)512), DataTypes.FLOAT()}, (String[])new String[]{"id", "name", "description", "weight"});
        BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
        ArrayList<Event> snapshotExpected = new ArrayList<Event>();
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{101, BinaryStringData.fromString((String)"scooter"), BinaryStringData.fromString((String)"Small 2-wheel scooter"), Float.valueOf(3.14f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{102, BinaryStringData.fromString((String)"car battery"), BinaryStringData.fromString((String)"12V car battery"), Float.valueOf(8.1f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{103, BinaryStringData.fromString((String)"12-pack drill bits"), BinaryStringData.fromString((String)"12-pack of drill bits with sizes ranging from #40 to #3"), Float.valueOf(0.8f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{104, BinaryStringData.fromString((String)"hammer"), BinaryStringData.fromString((String)"12oz carpenter's hammer"), Float.valueOf(0.75f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{105, BinaryStringData.fromString((String)"hammer"), BinaryStringData.fromString((String)"14oz carpenter's hammer"), Float.valueOf(0.875f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{106, BinaryStringData.fromString((String)"hammer"), BinaryStringData.fromString((String)"16oz carpenter's hammer"), Float.valueOf(1.0f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{107, BinaryStringData.fromString((String)"rocks"), BinaryStringData.fromString((String)"box of assorted rocks"), Float.valueOf(5.3f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{108, BinaryStringData.fromString((String)"jacket"), BinaryStringData.fromString((String)"water resistent black wind breaker"), Float.valueOf(0.1f)})));
        snapshotExpected.add((Event)DataChangeEvent.insertEvent((TableId)tableId, (RecordData)generator.generate(new Object[]{109, BinaryStringData.fromString((String)"spare tire"), BinaryStringData.fromString((String)"24 inch spare tire"), Float.valueOf(22.2f)})));
        return snapshotExpected;
    }

    private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement statement) throws SQLException {
        ArrayList<Event> expected = new ArrayList<Event>();
        statement.execute(String.format("ALTER TABLE `%s`.`products` CHANGE COLUMN `description` `desc` VARCHAR(255) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new AlterColumnTypeEvent(tableId, Collections.singletonMap("description", DataTypes.VARCHAR((int)255))));
        expected.add((Event)new RenameColumnEvent(tableId, Collections.singletonMap("description", "desc")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new AlterColumnTypeEvent(tableId, Collections.singletonMap("desc", DataTypes.VARCHAR((int)400))));
        expected.add((Event)new RenameColumnEvent(tableId, Collections.singletonMap("desc", "desc2")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `desc1` VARCHAR(45) NULL AFTER `weight`;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"desc1", (DataType)DataTypes.VARCHAR((int)45)), AddColumnEvent.ColumnPosition.AFTER, "weight"))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `col2` VARCHAR(55) NULL AFTER `desc1`;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"col1", (DataType)DataTypes.VARCHAR((int)45)), AddColumnEvent.ColumnPosition.AFTER, "weight"))));
        expected.add((Event)new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition((Column)Column.physicalColumn((String)"col2", (DataType)DataTypes.VARCHAR((int)55)), AddColumnEvent.ColumnPosition.AFTER, "desc1"))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new DropColumnEvent(tableId, Collections.singletonList("desc2")));
        expected.add((Event)new AlterColumnTypeEvent(tableId, Collections.singletonMap("desc1", DataTypes.VARCHAR((int)65))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;", this.inventoryDatabase.getDatabaseName()));
        expected.add((Event)new DropColumnEvent(tableId, Collections.singletonList("DESC3")));
        statement.execute(String.format("ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;", this.inventoryDatabase.getDatabaseName()));
        return expected;
    }
}

