/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import javax.xml.bind.DatatypeConverter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.RecordDataTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

public class MySqlFullTypesITCase
extends MySqlSourceTestBase {
    private static final MySqlContainer MYSQL8_CONTAINER = MySqlFullTypesITCase.createMySqlContainer((MySqlVersion)MySqlVersion.V8_0, (String)"docker/server-gtids/expire-seconds/my.cnf");
    private final UniqueDatabase fullTypesMySql57Database = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw");
    private final UniqueDatabase fullTypesMySql8Database = new UniqueDatabase(MYSQL8_CONTAINER, "column_type_test_mysql8", "mysqluser", "mysqlpw");
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private static final RowType COMMON_TYPES = RowType.of((DataType[])new DataType[]{DataTypes.DECIMAL((int)20, (int)0).notNull(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)20, (int)0), DataTypes.DECIMAL((int)20, (int)0), DataTypes.VARCHAR((int)255), DataTypes.CHAR((int)3), DataTypes.DOUBLE(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)4), DataTypes.DECIMAL((int)8, (int)4), DataTypes.DECIMAL((int)8, (int)4), DataTypes.DECIMAL((int)6, (int)0), DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.BINARY((int)1), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BINARY((int)16), DataTypes.BINARY((int)8), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()});

    @BeforeClass
    public static void beforeClass() {
        LOG.info("Starting MySql8 containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Container MySql8 is started.");
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MySql8 containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Container MySql8 is stopped.");
    }

    @Before
    public void before() {
        this.env.setParallelism(4);
        this.env.enableCheckpointing(200L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Test
    public void testMysql57CommonDataTypes() throws Throwable {
        this.testCommonDataTypes(this.fullTypesMySql57Database);
    }

    @Test
    public void testMySql8CommonDataTypes() throws Throwable {
        this.testCommonDataTypes(this.fullTypesMySql8Database);
    }

    @Test
    public void testMysql57TimeDataTypes() throws Throwable {
        RowType recordType = RowType.of((DataType[])new DataType[]{DataTypes.DECIMAL((int)20, (int)0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME((int)0), DataTypes.TIME((int)3), DataTypes.TIME((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP_LTZ((int)0), DataTypes.TIMESTAMP_LTZ((int)0)});
        Object[] expectedSnapshot = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), 2021, 18460, 64822000, 64822123, 64822123, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), null};
        Object[] expectedStreamRecord = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), 2021, 18460, 64822000, 64822123, null, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2000-01-01 00:00:00"))};
        this.testTimeDataTypes(this.fullTypesMySql57Database, recordType, expectedSnapshot, expectedStreamRecord);
    }

    @Test
    public void testMysql8TimeDataTypes() throws Throwable {
        RowType recordType = RowType.of((DataType[])new DataType[]{DataTypes.DECIMAL((int)20, (int)0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME((int)0), DataTypes.TIME((int)3), DataTypes.TIME((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP_LTZ((int)0), DataTypes.TIMESTAMP_LTZ((int)3), DataTypes.TIMESTAMP_LTZ((int)6), DataTypes.TIMESTAMP_LTZ((int)0)});
        Object[] expectedSnapshot = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), 2021, 18460, 64822000, 64822123, 64822123, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22.123")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22.123456")), null};
        Object[] expectedStreamRecord = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), 2021, 18460, 64822000, 64822123, null, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22.123")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22.123456")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2000-01-01 00:00:00"))};
        this.testTimeDataTypes(this.fullTypesMySql8Database, recordType, expectedSnapshot, expectedStreamRecord);
    }

    @Test
    public void testMysql57PrecisionTypes() throws Throwable {
        this.testMysqlPrecisionTypes(this.fullTypesMySql57Database);
    }

    @Test
    public void testMysql8PrecisionTypes() throws Throwable {
        this.testMysqlPrecisionTypes(this.fullTypesMySql8Database);
    }

    public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
        RowType recordType = RowType.of((DataType[])new DataType[]{DataTypes.DECIMAL((int)20, (int)0).notNull(), DataTypes.DECIMAL((int)6, (int)2), DataTypes.DECIMAL((int)9, (int)4), DataTypes.DECIMAL((int)20, (int)4), DataTypes.TIME((int)0), DataTypes.TIME((int)3), DataTypes.TIME((int)6), DataTypes.TIMESTAMP((int)0), DataTypes.TIMESTAMP((int)3), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP_LTZ((int)0), DataTypes.TIMESTAMP_LTZ((int)3), DataTypes.TIMESTAMP_LTZ((int)6), DataTypes.TIMESTAMP_LTZ((int)0)});
        Object[] expectedSnapshot = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("123.4"), (int)6, (int)2), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1234.5"), (int)9, (int)4), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1234.56"), (int)20, (int)4), 64800000, 64822100, 64822100, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:00")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:00")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22"))};
        Object[] expectedStreamRecord = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("123.4"), (int)6, (int)2), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1234.5"), (int)9, (int)4), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1234.56"), (int)20, (int)4), 64800000, 64822100, null, TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:00")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), TimestampData.fromTimestamp((Timestamp)Timestamp.valueOf("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:00")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22")), LocalZonedTimestampData.fromInstant((Instant)this.toInstant("2020-07-17 18:00:22"))};
        database.createAndInitialize();
        CloseableIterator iterator = this.env.fromSource(this.getFlinkSourceProvider(new String[]{"precision_types"}, database).getSource(), WatermarkStrategy.noWatermarks(), "Event-Source").executeAndCollect();
        List snapshotResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData snapshotRecord = ((DataChangeEvent)snapshotResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(snapshotRecord, recordType)).isEqualTo((Object)expectedSnapshot);
        try (Connection connection = database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;");
        }
        List streamResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData streamRecord = ((DataChangeEvent)streamResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(streamRecord, recordType)).isEqualTo((Object)expectedStreamRecord);
    }

    private void testCommonDataTypes(UniqueDatabase database) throws Exception {
        database.createAndInitialize();
        CloseableIterator iterator = this.env.fromSource(this.getFlinkSourceProvider(new String[]{"common_types"}, database).getSource(), WatermarkStrategy.noWatermarks(), "Event-Source").executeAndCollect();
        String expectedPointJsonText = "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}";
        String expectedGeometryJsonText = "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}";
        String expectLinestringJsonText = "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}";
        String expectPolygonJsonText = "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}";
        String expectMultipointJsonText = "{\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}";
        String expectMultilineJsonText = "{\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}";
        String expectMultipolygonJsonText = "{\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}";
        String expectGeometryCollectionJsonText = "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}";
        Object[] expectedSnapshot = new Object[]{DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("1"), (int)20, (int)0), (byte)127, (short)255, (short)255, (short)Short.MAX_VALUE, 65535, 65535, 0x7FFFFF, 0xFFFFFF, 0xFFFFFF, Integer.MAX_VALUE, 0xFFFFFFFFL, 0xFFFFFFFFL, Integer.MAX_VALUE, Long.MAX_VALUE, DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("18446744073709551615"), (int)20, (int)0), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("18446744073709551615"), (int)20, (int)0), BinaryStringData.fromString((String)"Hello World"), BinaryStringData.fromString((String)"abc"), 123.102, Float.valueOf(123.102f), Float.valueOf(123.103f), Float.valueOf(123.104f), 404.4443, 404.4444, 404.4445, DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("123.4567"), (int)8, (int)4), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("123.4568"), (int)8, (int)4), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("123.4569"), (int)8, (int)4), DecimalData.fromBigDecimal((BigDecimal)new BigDecimal("346"), (int)6, (int)0), BinaryStringData.fromString((String)"34567892.1"), false, new byte[]{3}, true, true, DatatypeConverter.parseHexBinary((String)"651aed08-390f-4893-b2f1-36923e7b7400".replace("-", "")), new byte[]{4, 4, 4, 4, 4, 4, 4, 4}, BinaryStringData.fromString((String)"text"), new byte[]{16}, new byte[]{16}, new byte[]{16}, new byte[]{16}, 2021, BinaryStringData.fromString((String)"red"), BinaryStringData.fromString((String)"{\"key1\": \"value1\"}"), BinaryStringData.fromString((String)expectedPointJsonText), BinaryStringData.fromString((String)expectedGeometryJsonText), BinaryStringData.fromString((String)expectLinestringJsonText), BinaryStringData.fromString((String)expectPolygonJsonText), BinaryStringData.fromString((String)expectMultipointJsonText), BinaryStringData.fromString((String)expectMultilineJsonText), BinaryStringData.fromString((String)expectMultipolygonJsonText), BinaryStringData.fromString((String)expectGeometryCollectionJsonText)};
        List snapshotResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData snapshotRecord = ((DataChangeEvent)snapshotResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(snapshotRecord, COMMON_TYPES)).isEqualTo((Object)expectedSnapshot);
        try (Connection connection = database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE common_types SET big_decimal_c = null WHERE id = 1;");
        }
        expectedSnapshot[30] = null;
        expectedSnapshot[44] = BinaryStringData.fromString((String)"{\"key1\":\"value1\"}");
        Object[] expectedStreamRecord = expectedSnapshot;
        List streamResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData streamRecord = ((DataChangeEvent)streamResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(streamRecord, COMMON_TYPES)).isEqualTo((Object)expectedStreamRecord);
    }

    private Instant toInstant(String ts) {
        return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
    }

    private void testTimeDataTypes(UniqueDatabase database, RowType recordType, Object[] expectedSnapshot, Object[] expectedStreamRecord) throws Exception {
        database.createAndInitialize();
        CloseableIterator iterator = this.env.fromSource(this.getFlinkSourceProvider(new String[]{"time_types"}, database).getSource(), WatermarkStrategy.noWatermarks(), "Event-Source").executeAndCollect();
        List snapshotResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData snapshotRecord = ((DataChangeEvent)snapshotResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(snapshotRecord, recordType)).isEqualTo((Object)expectedSnapshot);
        try (Connection connection = database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
        }
        List streamResults = (List)MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, (int)1).f0;
        RecordData streamRecord = ((DataChangeEvent)streamResults.get(0)).after();
        Assertions.assertThat((Object[])RecordDataTestUtils.recordFields(streamRecord, recordType)).isEqualTo((Object)expectedStreamRecord);
    }

    private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables, UniqueDatabase database) {
        String[] captureTableIds = (String[])Arrays.stream(captureTables).map(tableName -> database.getDatabaseName() + "." + tableName).toArray(String[]::new);
        MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory().startupOptions(StartupOptions.initial()).databaseList(new String[]{database.getDatabaseName()}).tableList(captureTableIds).includeSchemaChanges(false).hostname(database.getHost()).port(database.getDatabasePort()).splitSize(10).fetchSize(2).username(database.getUsername()).password(database.getPassword()).serverTimeZone(ZoneId.of("UTC").toString()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism()));
        return (FlinkSourceProvider)new MySqlDataSource(configFactory).getEventSourceProvider();
    }
}

