/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.cdc.connectors.mysql.LegacyMySqlTestBase;
import org.apache.flink.cdc.connectors.mysql.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Assert;
import org.junit.Test;

public class LegacyMySqlSourceITCase
extends LegacyMySqlTestBase {
    private final UniqueDatabase fullTypesDatabase = new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw");

    @Test
    public void testConsumingAllEventsWithJsonFormatIncludeSchema() throws Exception {
        this.testConsumingAllEventsWithJsonFormat(true);
    }

    @Test
    public void testConsumingAllEventsWithJsonFormatExcludeSchema() throws Exception {
        this.testConsumingAllEventsWithJsonFormat(false);
    }

    @Test
    public void testConsumingAllEventsWithJsonFormatWithNumericDecimal() throws Exception {
        HashMap<String, Object> customConverterConfigs = new HashMap<String, Object>();
        customConverterConfigs.put("decimal.format", "numeric");
        this.testConsumingAllEventsWithJsonFormat(false, customConverterConfigs, "file/debezium-data-schema-exclude-with-numeric-decimal.json");
    }

    private void testConsumingAllEventsWithJsonFormat(Boolean includeSchema, Map<String, Object> customConverterConfigs, String expectedFile) throws Exception {
        this.fullTypesDatabase.createAndInitialize();
        JsonDebeziumDeserializationSchema schema = customConverterConfigs == null ? new JsonDebeziumDeserializationSchema(includeSchema) : new JsonDebeziumDeserializationSchema(includeSchema, customConverterConfigs);
        DebeziumSourceFunction sourceFunction = MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.fullTypesDatabase.getDatabaseName()}).username(this.fullTypesDatabase.getUsername()).password(this.fullTypesDatabase.getPassword()).deserializer((DebeziumDeserializationSchema)schema).build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
        JSONObject expected = (JSONObject)JSONObject.parseObject((byte[])LegacyMySqlSourceITCase.readLines(expectedFile), JSONObject.class, (Feature[])new Feature[0]);
        JSONObject expectSnapshot = expected.getJSONObject("expected_snapshot");
        DataStreamSource source = env.addSource((SourceFunction)sourceFunction);
        tEnv.createTemporaryView("full_types", (DataStream)source);
        TableResult result = tEnv.executeSql("SELECT * FROM full_types");
        CloseableIterator snapshot = result.collect();
        LegacyMySqlSourceITCase.waitForSnapshotStarted((CloseableIterator<Row>)snapshot);
        Assert.assertTrue((boolean)LegacyMySqlSourceITCase.dataInJsonIsEquals(LegacyMySqlSourceITCase.fetchRows((Iterator<Row>)snapshot, 1).get(0).toString(), expectSnapshot.toString()));
        try (Connection connection = this.fullTypesDatabase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
        }
        CloseableIterator binlog = result.collect();
        JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
        Assert.assertTrue((boolean)LegacyMySqlSourceITCase.dataInJsonIsEquals(LegacyMySqlSourceITCase.fetchRows((Iterator<Row>)binlog, 1).get(0).toString(), expectBinlog.toString()));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private void testConsumingAllEventsWithJsonFormat(Boolean includeSchema) throws Exception {
        String expectedFile = includeSchema != false ? "file/debezium-data-schema-include.json" : "file/debezium-data-schema-exclude.json";
        this.testConsumingAllEventsWithJsonFormat(includeSchema, null, expectedFile);
    }

    private static List<Object> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<Object> rows = new ArrayList<Object>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.getField(0));
            --size;
        }
        return rows;
    }

    private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
        while (!iterator.hasNext()) {
            Thread.sleep(100L);
        }
    }

    private static byte[] readLines(String resource) throws IOException, URISyntaxException {
        Path path = Paths.get(Objects.requireNonNull(LegacyMySqlSourceITCase.class.getClassLoader().getResource(resource)).toURI());
        return Files.readAllBytes(path);
    }

    private static boolean dataInJsonIsEquals(String actual, String expect) {
        JSONObject actualJsonObject = JSONObject.parseObject((String)actual);
        JSONObject expectJsonObject = JSONObject.parseObject((String)expect);
        if (expectJsonObject.getJSONObject("payload") != null && actualJsonObject.getJSONObject("payload") != null) {
            expectJsonObject = expectJsonObject.getJSONObject("payload");
            actualJsonObject = actualJsonObject.getJSONObject("payload");
        }
        return LegacyMySqlSourceITCase.jsonObjectEquals(expectJsonObject.getJSONObject("after"), actualJsonObject.getJSONObject("after")) && LegacyMySqlSourceITCase.jsonObjectEquals(expectJsonObject.getJSONObject("before"), actualJsonObject.getJSONObject("before")) && Objects.equals(expectJsonObject.get((Object)"op"), actualJsonObject.get((Object)"op"));
    }

    private static boolean jsonObjectEquals(JSONObject a, JSONObject b) {
        return a == b || a != null && a.toString().equals(b.toString());
    }
}

