/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.table;

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MysqlConnectorCharsetITCase
extends MySqlSourceTestBase {
    private static final String TEST_USER = "mysqluser";
    private static final String TEST_PASSWORD = "mysqlpw";
    private static final UniqueDatabase charsetTestDatabase = new UniqueDatabase(MYSQL_CONTAINER, "charset_test", "mysqluser", "mysqlpw");
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    private final String testName;
    private final String[] snapshotExpected;
    private final String[] binlogExpected;

    public MysqlConnectorCharsetITCase(String testName, String[] snapshotExpected, String[] binlogExpected) {
        this.testName = testName;
        this.snapshotExpected = snapshotExpected;
        this.binlogExpected = binlogExpected;
    }

    @Parameterized.Parameters(name="Test column charset: {0}")
    public static Object[] parameters() {
        return new Object[][]{{"ucs2_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"utf8_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"ascii_test", new String[]{"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"}, new String[]{"-D[1, ascii test!?]", "-D[2, Craig Marshall]", "-D[3, {test}]", "+I[11, ascii test!?]", "+I[12, Craig Marshall]", "+I[13, {test}]"}}, {"sjis_test", new String[]{"+I[1, \u3072\u3073\u3074]", "+I[2, Craig Marshall]", "+I[3, \u30d5\u30d6\u30d7]"}, new String[]{"-D[1, \u3072\u3073\u3074]", "-D[2, Craig Marshall]", "-D[3, \u30d5\u30d6\u30d7]", "+I[11, \u3072\u3073\u3074]", "+I[12, Craig Marshall]", "+I[13, \u30d5\u30d6\u30d7]"}}, {"gbk_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"cp932_test", new String[]{"+I[1, \u3072\u3073\u3074]", "+I[2, Craig Marshall]", "+I[3, \u30d5\u30d6\u30d7]"}, new String[]{"-D[1, \u3072\u3073\u3074]", "-D[2, Craig Marshall]", "-D[3, \u30d5\u30d6\u30d7]", "+I[11, \u3072\u3073\u3074]", "+I[12, Craig Marshall]", "+I[13, \u30d5\u30d6\u30d7]"}}, {"gb2312_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"ujis_test", new String[]{"+I[1, \u3072\u3073\u3074]", "+I[2, Craig Marshall]", "+I[3, \u30d5\u30d6\u30d7]"}, new String[]{"-D[1, \u3072\u3073\u3074]", "-D[2, Craig Marshall]", "-D[3, \u30d5\u30d6\u30d7]", "+I[11, \u3072\u3073\u3074]", "+I[12, Craig Marshall]", "+I[13, \u30d5\u30d6\u30d7]"}}, {"euckr_test", new String[]{"+I[1, \uc8e0\uc8fc\uc96c]", "+I[2, Craig Marshall]", "+I[3, \ud55c\uad6d\uc5b4]"}, new String[]{"-D[1, \uc8e0\uc8fc\uc96c]", "-D[2, Craig Marshall]", "-D[3, \ud55c\uad6d\uc5b4]", "+I[11, \uc8e0\uc8fc\uc96c]", "+I[12, Craig Marshall]", "+I[13, \ud55c\uad6d\uc5b4]"}}, {"latin1_test", new String[]{"+I[1, \u00c0\u00c6\u00c9]", "+I[2, Craig Marshall]", "+I[3, \u00dc\u00e6\u00fb]"}, new String[]{"-D[1, \u00c0\u00c6\u00c9]", "-D[2, Craig Marshall]", "-D[3, \u00dc\u00e6\u00fb]", "+I[11, \u00c0\u00c6\u00c9]", "+I[12, Craig Marshall]", "+I[13, \u00dc\u00e6\u00fb]"}}, {"latin2_test", new String[]{"+I[1, \u00d3\u00d4\u0150\u00d6]", "+I[2, Craig Marshall]", "+I[3, \u0160\u015e\u0164\u0179]"}, new String[]{"-D[1, \u00d3\u00d4\u0150\u00d6]", "-D[2, Craig Marshall]", "-D[3, \u0160\u015e\u0164\u0179]", "+I[11, \u00d3\u00d4\u0150\u00d6]", "+I[12, Craig Marshall]", "+I[13, \u0160\u015e\u0164\u0179]"}}, {"greek_test", new String[]{"+I[1, \u03b1\u03b2\u03b3\u03b4\u03b5]", "+I[2, Craig Marshall]", "+I[3, \u03b8\u03b9\u03ba\u03bb]"}, new String[]{"-D[1, \u03b1\u03b2\u03b3\u03b4\u03b5]", "-D[2, Craig Marshall]", "-D[3, \u03b8\u03b9\u03ba\u03bb]", "+I[11, \u03b1\u03b2\u03b3\u03b4\u03b5]", "+I[12, Craig Marshall]", "+I[13, \u03b8\u03b9\u03ba\u03bb]"}}, {"hebrew_test", new String[]{"+I[1, \u05d1\u05d1\u05e7\u05e9\u05d4]", "+I[2, Craig Marshall]", "+I[3, \u05e9\u05e8\u05e4\u05d4]"}, new String[]{"-D[1, \u05d1\u05d1\u05e7\u05e9\u05d4]", "-D[2, Craig Marshall]", "-D[3, \u05e9\u05e8\u05e4\u05d4]", "+I[11, \u05d1\u05d1\u05e7\u05e9\u05d4]", "+I[12, Craig Marshall]", "+I[13, \u05e9\u05e8\u05e4\u05d4]"}}, {"cp866_test", new String[]{"+I[1, \u0442\u0432\u043e\u0439]", "+I[2, Craig Marshall]", "+I[3, \u043b\u044e\u0431\u043e\u0439]"}, new String[]{"-D[1, \u0442\u0432\u043e\u0439]", "-D[2, Craig Marshall]", "-D[3, \u043b\u044e\u0431\u043e\u0439]", "+I[11, \u0442\u0432\u043e\u0439]", "+I[12, Craig Marshall]", "+I[13, \u043b\u044e\u0431\u043e\u0439]"}}, {"tis620_test", new String[]{"+I[1, \u0e20\u0e32\u0e29\u0e32\u0e44\u0e17\u0e22]", "+I[2, Craig Marshall]", "+I[3, \u0e06\u0e07\u0e08\u0e09]"}, new String[]{"-D[1, \u0e20\u0e32\u0e29\u0e32\u0e44\u0e17\u0e22]", "-D[2, Craig Marshall]", "-D[3, \u0e06\u0e07\u0e08\u0e09]", "+I[11, \u0e20\u0e32\u0e29\u0e32\u0e44\u0e17\u0e22]", "+I[12, Craig Marshall]", "+I[13, \u0e06\u0e07\u0e08\u0e09]"}}, {"cp1250_test", new String[]{"+I[1, \u00d3\u00d4\u0150\u00d6]", "+I[2, Craig Marshall]", "+I[3, \u0160\u015e\u0164\u0179]"}, new String[]{"-D[1, \u00d3\u00d4\u0150\u00d6]", "-D[2, Craig Marshall]", "-D[3, \u0160\u015e\u0164\u0179]", "+I[11, \u00d3\u00d4\u0150\u00d6]", "+I[12, Craig Marshall]", "+I[13, \u0160\u015e\u0164\u0179]"}}, {"cp1251_test", new String[]{"+I[1, \u0442\u0432\u043e\u0439]", "+I[2, Craig Marshall]", "+I[3, \u043b\u044e\u0431\u043e\u0439]"}, new String[]{"-D[1, \u0442\u0432\u043e\u0439]", "-D[2, Craig Marshall]", "-D[3, \u043b\u044e\u0431\u043e\u0439]", "+I[11, \u0442\u0432\u043e\u0439]", "+I[12, Craig Marshall]", "+I[13, \u043b\u044e\u0431\u043e\u0439]"}}, {"cp1257_test", new String[]{"+I[1, piedzimst br\u012bvi]", "+I[2, Craig Marshall]", "+I[3, apvelt\u012bti ar sapr\u0101tu]"}, new String[]{"-D[1, piedzimst br\u012bvi]", "-D[2, Craig Marshall]", "-D[3, apvelt\u012bti ar sapr\u0101tu]", "+I[11, piedzimst br\u012bvi]", "+I[12, Craig Marshall]", "+I[13, apvelt\u012bti ar sapr\u0101tu]"}}, {"macroman_test", new String[]{"+I[1, \u00c0\u00c6\u00c9]", "+I[2, Craig Marshall]", "+I[3, \u00dc\u00e6\u00fb]"}, new String[]{"-D[1, \u00c0\u00c6\u00c9]", "-D[2, Craig Marshall]", "-D[3, \u00dc\u00e6\u00fb]", "+I[11, \u00c0\u00c6\u00c9]", "+I[12, Craig Marshall]", "+I[13, \u00dc\u00e6\u00fb]"}}, {"macce_test", new String[]{"+I[1, \u00d3\u00d4\u0150\u00d6]", "+I[2, Craig Marshall]", "+I[3, \u016e\u00da\u0170\u00dc]"}, new String[]{"-D[1, \u00d3\u00d4\u0150\u00d6]", "-D[2, Craig Marshall]", "-D[3, \u016e\u00da\u0170\u00dc]", "+I[11, \u00d3\u00d4\u0150\u00d6]", "+I[12, Craig Marshall]", "+I[13, \u016e\u00da\u0170\u00dc]"}}, {"big5_test", new String[]{"+I[1, \u5927\u4e94]", "+I[2, Craig Marshall]", "+I[3, \u4e39\u5e97]"}, new String[]{"-D[1, \u5927\u4e94]", "-D[2, Craig Marshall]", "-D[3, \u4e39\u5e97]", "+I[11, \u5927\u4e94]", "+I[12, Craig Marshall]", "+I[13, \u4e39\u5e97]"}}};
    }

    @BeforeClass
    public static void beforeClass() {
        charsetTestDatabase.createAndInitialize();
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(4);
        this.env.enableCheckpointing(200L);
    }

    @Test
    public void testCharset() throws Exception {
        String sourceDDL = String.format("CREATE TABLE %s (\n  table_id BIGINT,\n  table_name STRING,\n  primary key(table_id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", this.testName, MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), charsetTestDatabase.getUsername(), charsetTestDatabase.getPassword(), charsetTestDatabase.getDatabaseName(), this.testName, true, this.getServerId(), 4);
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", this.testName));
        CloseableIterator iterator = result.collect();
        MysqlConnectorCharsetITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        MysqlConnectorCharsetITCase.assertEqualsInAnyOrder(Arrays.asList(this.snapshotExpected), MysqlConnectorCharsetITCase.fetchRows((Iterator<Row>)iterator, this.snapshotExpected.length));
        try (Connection connection = charsetTestDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute(String.format("UPDATE %s SET table_id = table_id + 10;", this.testName));
        }
        MysqlConnectorCharsetITCase.assertEqualsInAnyOrder(Arrays.asList(this.binlogExpected), MysqlConnectorCharsetITCase.fetchRows((Iterator<Row>)iterator, this.binlogExpected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        return serverId + "-" + (serverId + this.env.getParallelism());
    }

    private static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
        while (!iterator.hasNext()) {
            Thread.sleep(100L);
        }
    }
}

