/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.polardbx;

import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.flink.cdc.connectors.polardbx.PolardbxSourceTestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PolardbxCharsetITCase
extends PolardbxSourceTestBase {
    private static final String DATABASE = "charset_test";
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    private final String testName;
    private final String[] snapshotExpected;
    private final String[] binlogExpected;

    public PolardbxCharsetITCase(String testName, String[] snapshotExpected, String[] binlogExpected) {
        this.testName = testName;
        this.snapshotExpected = snapshotExpected;
        this.binlogExpected = binlogExpected;
    }

    @Parameterized.Parameters(name="Test column charset: {0}")
    public static Object[] parameters() {
        return new Object[][]{{"utf8_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"ascii_test", new String[]{"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"}, new String[]{"-D[1, ascii test!?]", "-D[2, Craig Marshall]", "-D[3, {test}]", "+I[11, ascii test!?]", "+I[12, Craig Marshall]", "+I[13, {test}]"}}, {"gbk_test", new String[]{"+I[1, \u6d4b\u8bd5\u6570\u636e]", "+I[2, Craig Marshall]", "+I[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}, new String[]{"-D[1, \u6d4b\u8bd5\u6570\u636e]", "-D[2, Craig Marshall]", "-D[3, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]", "+I[11, \u6d4b\u8bd5\u6570\u636e]", "+I[12, Craig Marshall]", "+I[13, \u53e6\u4e00\u4e2a\u6d4b\u8bd5\u6570\u636e]"}}, {"latin1_test", new String[]{"+I[1, \u00c0\u00c6\u00c9]", "+I[2, Craig Marshall]", "+I[3, \u00dc\u00e6\u00fb]"}, new String[]{"-D[1, \u00c0\u00c6\u00c9]", "-D[2, Craig Marshall]", "-D[3, \u00dc\u00e6\u00fb]", "+I[11, \u00c0\u00c6\u00c9]", "+I[12, Craig Marshall]", "+I[13, \u00dc\u00e6\u00fb]"}}, {"big5_test", new String[]{"+I[1, \u5927\u4e94]", "+I[2, Craig Marshall]", "+I[3, \u4e39\u5e97]"}, new String[]{"-D[1, \u5927\u4e94]", "-D[2, Craig Marshall]", "-D[3, \u4e39\u5e97]", "+I[11, \u5927\u4e94]", "+I[12, Craig Marshall]", "+I[13, \u4e39\u5e97]"}}};
    }

    @BeforeClass
    public static void beforeClass() throws InterruptedException {
        PolardbxCharsetITCase.initializePolardbxTables(DATABASE, s -> !StringUtils.isNullOrWhitespaceOnly((String)s) && (s.contains("utf8_test") || s.contains("latin1_test") || s.contains("gbk_test") || s.contains("big5_test") || s.contains("ascii_test")));
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(4);
        this.env.enableCheckpointing(200L);
    }

    @Test
    public void testCharset() throws Exception {
        String sourceDDL = String.format("CREATE TABLE %s (\n  table_id BIGINT,\n  table_name STRING,\n  primary key(table_id) not enforced) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'server-id' = '%s', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.chunk.size' = '%s')", this.testName, "127.0.0.1", PORT, "polardbx_root", "123456", DATABASE, this.testName, true, this.getServerId(), 4);
        this.tEnv.executeSql(sourceDDL);
        TableResult result = this.tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", this.testName));
        CloseableIterator iterator = result.collect();
        PolardbxCharsetITCase.waitForSnapshotStarted((CloseableIterator<Row>)iterator);
        PolardbxCharsetITCase.assertEqualsInAnyOrder(Arrays.asList(this.snapshotExpected), PolardbxCharsetITCase.fetchRows((Iterator<Row>)iterator, this.snapshotExpected.length));
        try (Connection connection = PolardbxCharsetITCase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute(String.format("/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;", DATABASE, this.testName));
        }
        PolardbxCharsetITCase.assertEqualsInAnyOrder(Arrays.asList(this.binlogExpected), PolardbxCharsetITCase.fetchRows((Iterator<Row>)iterator, this.binlogExpected.length));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
        while (!iterator.hasNext()) {
            Thread.sleep(100L);
        }
    }
}

