/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;

public class KafkaSyncDatabaseActionITCase
extends KafkaActionITCaseBase {
    protected void testSchemaEvolutionMultiTopic(String format) throws Exception {
        String topic1 = "schema_evolution_0";
        String topic2 = "schema_evolution_1";
        boolean writeOne = false;
        int fileCount = 2;
        List<String> topics = Arrays.asList("schema_evolution_0", "schema_evolution_1");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            this.writeRecordsToKafka(topics.get(i), "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", format, i, format);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount, format);
    }

    protected void testSchemaEvolutionOneTopic(String format) throws Exception {
        String topic = "schema_evolution";
        boolean writeOne = true;
        int fileCount = 2;
        List<String> topics = Collections.singletonList("schema_evolution");
        topics.forEach(t -> this.createTestTopic((String)t, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            this.writeRecordsToKafka(topics.get(0), "kafka/%s/database/schemaevolution/topic%s/%s-data-1.txt", format, i, format);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl(topics, writeOne, fileCount, format);
    }

    private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount, String format) throws Exception {
        this.waitingTables("t1", "t2");
        FileStoreTable table1 = this.getFileStoreTable("t1");
        FileStoreTable table2 = this.getFileStoreTable("t2");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table1, rowType1, this.getPrimaryKey(format));
        RowType rowType2 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expected2 = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected2, table2, rowType2, this.getPrimaryKey(format));
        for (int i = 0; i < fileCount; ++i) {
            this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), "kafka/%s/database/schemaevolution/topic%s/%s-data-2.txt", format, i, format);
        }
        rowType1 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
        this.waitForResult(expected, table1, rowType1, this.getPrimaryKey(format));
        rowType2 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "address"});
        expected = format.equals("debezium") ? Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, NULL]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, Beijing]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]") : Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, Beijing]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]");
        this.waitForResult(expected, table2, rowType2, this.getPrimaryKey(format));
    }

    protected void testTopicIsEmpty(String format) {
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).build();
        Assertions.assertThatThrownBy(() -> ((KafkaSyncDatabaseAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"kafka_conf must and can only set one of the following options: topic,topic-pattern.")});
    }

    protected void testTableAffixMultiTopic(String format) throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), this.getPrimaryKey(format), this.getBucketKey(format), Collections.emptyMap());
        String topic1 = "prefix_suffix_0";
        String topic2 = "prefix_suffix_1";
        boolean writeOne = false;
        int fileCount = 2;
        List<String> topics = Arrays.asList("prefix_suffix_0", "prefix_suffix_1");
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < topics.size(); ++i) {
            this.writeRecordsToKafka(topics.get(i), "kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", format, i, format);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount, format);
    }

    protected void testTableAffixOneTopic(String format) throws Exception {
        this.createFileStoreTable("test_prefix_t1_test_suffix", RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"}), Collections.emptyList(), this.getPrimaryKey(format), this.getBucketKey(format), Collections.emptyMap());
        String topic1 = "prefix_suffix";
        List<String> topics = Collections.singletonList("prefix_suffix");
        boolean writeOne = true;
        int fileCount = 2;
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        for (int i = 0; i < fileCount; ++i) {
            this.writeRecordsToKafka(topics.get(0), String.format("kafka/%s/database/prefixsuffix/topic%s/%s-data-1.txt", format, i, format), new Object[0]);
        }
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").withTableConfig(this.getBasicTableConfig()).includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testTableAffixImpl(topics, writeOne, fileCount, format);
    }

    private void testTableAffixImpl(List<String> topics, boolean writeOne, int fileCount, String format) throws Exception {
        this.waitingTables("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
        FileStoreTable table1 = this.getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable table2 = this.getFileStoreTable("test_prefix_t2_test_suffix");
        RowType rowType1 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table1, rowType1, this.getPrimaryKey(format));
        RowType rowType2 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        expected = Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", "+I[104, hammer, 12oz carpenter's hammer, 0.75]");
        this.waitForResult(expected, table2, rowType2, this.getPrimaryKey(format));
        for (int i = 0; i < fileCount; ++i) {
            this.writeRecordsToKafka(writeOne ? topics.get(0) : topics.get(i), "kafka/%s/database/prefixsuffix/topic%s/%s-data-2.txt", format, i, format);
        }
        rowType1 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "address"});
        expected = format.equals("debezium") ? Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, Beijing]", "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", "+I[102, car battery, 12V car battery, 8.1, Shanghai]", "+I[102, car battery, 12V car battery, 8.1, NULL]") : Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14, Beijing]", "+I[102, car battery, 12V car battery, 8.1, Shanghai]");
        this.waitForResult(expected, table1, rowType1, this.getPrimaryKey(format));
        rowType2 = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight", "age"});
        expected = format.equals("debezium") ? Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, NULL]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, NULL]") : Arrays.asList("+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]");
        this.waitForResult(expected, table2, rowType2, this.getPrimaryKey(format));
    }

    protected void testIncludingTables(String format) throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignore"), format);
    }

    protected void testExcludingTables(String format) throws Exception {
        this.includingAndExcludingTablesImpl(null, "flink|paimon.+", Collections.singletonList("ignore"), Arrays.asList("flink", "paimon_1", "paimon_2"), format);
    }

    protected void testIncludingAndExcludingTables(String format) throws Exception {
        this.includingAndExcludingTablesImpl("flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "ignore"), format);
    }

    private void includingAndExcludingTablesImpl(@Nullable String includingTables, @Nullable String excludingTables, List<String> existedTables, List<String> notExistedTables, String format) throws Exception {
        String topic1 = "include_exclude" + UUID.randomUUID();
        List<String> topics = Collections.singletonList(topic1);
        topics.forEach(topic -> this.createTestTopic((String)topic, 1, 1));
        this.writeRecordsToKafka(topics.get(0), "kafka/%s/database/include/topic0/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), String.join((CharSequence)";", topics));
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).includingTables(includingTables).excludingTables(excludingTables).withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables(existedTables);
        this.assertTableNotExists(notExistedTables);
    }

    protected void testCaseInsensitive(String format) throws Exception {
        String topic = "case-insensitive";
        this.createTestTopic("case-insensitive", 1, 1);
        this.writeRecordsToKafka("case-insensitive", "kafka/%s/database/case-insensitive/%s-data-1.txt", format, format);
        Map<String, String> kafkaConfig = this.getBasicKafkaConfig();
        kafkaConfig.put(KafkaConnectorOptions.VALUE_FORMAT.key(), format + "-json");
        kafkaConfig.put(KafkaConnectorOptions.TOPIC.key(), "case-insensitive");
        KafkaSyncDatabaseAction action = (KafkaSyncDatabaseAction)this.syncDatabaseActionBuilder(kafkaConfig).withTableConfig(this.getBasicTableConfig()).withCatalogConfig(Collections.singletonMap(CatalogOptions.CASE_SENSITIVE.key(), "false")).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.waitingTables("t1");
        FileStoreTable table = this.getFileStoreTable("t1");
        RowType rowType = RowType.of((DataType[])new DataType[]{this.getDataType(format), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, (String[])new String[]{"id", "name", "description", "weight"});
        List<String> expected = Arrays.asList("+I[101, scooter, Small 2-wheel scooter, 3.14]", "+I[102, car battery, 12V car battery, 8.1]");
        this.waitForResult(expected, table, rowType, this.getPrimaryKey(format));
    }

    private DataType getDataType(String format) {
        return format.equals("debezium") ? DataTypes.STRING() : DataTypes.STRING().notNull();
    }

    private List<String> getPrimaryKey(String format) {
        return format.equals("debezium") ? Collections.emptyList() : Collections.singletonList("id");
    }

    private List<String> getBucketKey(String format) {
        return format.equals("debezium") ? Collections.singletonList("id") : Collections.emptyList();
    }
}

