/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.table;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

@RunWith(value=Parameterized.class)
public class MySqlTimezoneITCase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlTimezoneITCase.class);
    private static TemporaryFolder tempFolder;
    private static File resourceFolder;
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    @Parameterized.Parameter
    public Boolean incrementalSnapshot;

    @Parameterized.Parameters(name="incrementalSnapshot: {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Before
    public void setup() throws Exception {
        resourceFolder = Paths.get(Objects.requireNonNull(MySqlValidatorTest.class.getClassLoader().getResource(".")).toURI()).toFile();
        tempFolder = new TemporaryFolder(resourceFolder);
        tempFolder.create();
        if (this.incrementalSnapshot.booleanValue()) {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        } else {
            this.env.setParallelism(1);
        }
    }

    @Test
    public void testMySqlServerInBerlin() throws Exception {
        this.testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
    }

    @Test
    public void testMySqlServerInShanghai() throws Exception {
        this.testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
    }

    private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception {
        MySqlContainer mySqlContainer = (MySqlContainer)new MySqlContainer().withConfigurationOverride(this.buildMySqlConfigWithTimezone(timezone)).withSetupSQL("docker/setup.sql").withDatabaseName("flink-test").withUsername("flinkuser").withPassword("flinkpw").withLogConsumer((Consumer)new Slf4jLogConsumer(LOG));
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(mySqlContainer)).join();
        LOG.info("Containers are started.");
        UniqueDatabase fullTypesDatabase = new UniqueDatabase(mySqlContainer, "column_type_test", "mysqluser", "mysqlpw");
        fullTypesDatabase.createAndInitialize();
        String sourceDDL = String.format("CREATE TABLE full_types (\n    `id` INT NOT NULL,\n    tiny_c TINYINT,\n    tiny_un_c SMALLINT ,\n    small_c SMALLINT,\n    small_un_c INT,\n    int_c INT ,\n    int_un_c BIGINT,\n    int11_c BIGINT,\n    big_c BIGINT,\n    varchar_c STRING,\n    char_c STRING,\n    float_c FLOAT,\n    double_c DOUBLE,\n    decimal_c DECIMAL(8, 4),\n    numeric_c DECIMAL(6, 0),\n    boolean_c BOOLEAN,\n    date_c DATE,\n    time_c TIME(0),\n    datetime3_c TIMESTAMP(3),\n    datetime6_c TIMESTAMP(6),\n    timestamp_c TIMESTAMP(0),\n    file_uuid BYTES,\n    primary key (`id`) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'scan.incremental.snapshot.chunk.size' = '%s', 'server-time-zone'='%s')", mySqlContainer.getHost(), mySqlContainer.getDatabasePort(), fullTypesDatabase.getUsername(), fullTypesDatabase.getPassword(), fullTypesDatabase.getDatabaseName(), "full_types", this.incrementalSnapshot, this.getServerId(), this.getSplitSize(), timezone);
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql("SELECT date_c, time_c, datetime3_c, datetime6_c, timestamp_c FROM full_types");
        CloseableIterator iterator = result.collect();
        String[] expectedSnapshot = new String[]{"+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]"};
        MySqlSourceTestBase.assertEqualsInAnyOrder(Arrays.asList(expectedSnapshot), MySqlTimezoneITCase.fetchRows((Iterator<Row>)iterator, expectedSnapshot.length));
        try (Connection connection = fullTypesDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
        }
        String[] expectedBinlog = new String[]{"-U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", "+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]"};
        MySqlSourceTestBase.assertEqualsInOrder(Arrays.asList(expectedBinlog), MySqlTimezoneITCase.fetchRows((Iterator<Row>)iterator, expectedBinlog.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
        mySqlContainer.stop();
    }

    private String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        if (this.incrementalSnapshot.booleanValue()) {
            return serverId + "-" + (serverId + this.env.getParallelism());
        }
        return String.valueOf(serverId);
    }

    private int getSplitSize() {
        if (this.incrementalSnapshot.booleanValue()) {
            return 4;
        }
        return 0;
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    private String buildMySqlConfigWithTimezone(String timezone) {
        try {
            File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
            Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"), new FileAttribute[0]);
            String mysqldConf = "[mysqld]\nbinlog_format = row\nlog_bin = mysql-bin\nserver-id = 223344\nbinlog_row_image = FULL\n";
            String timezoneConf = "default-time_zone = '" + timezone + "'\n";
            Files.write(cnf, Collections.singleton(mysqldConf + timezoneConf), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
            return Paths.get(resourceFolder.getAbsolutePath(), new String[0]).relativize(cnf).toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create my.cnf file.", e);
        }
    }
}

