/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.pulsar;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionITCaseBase;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class PulsarSyncTableActionITCase
extends PulsarActionITCaseBase {
    @Test
    @Timeout(value=120L)
    public void testSchemaEvolution() throws Exception {
        this.runSingleTableSchemaEvolution("schemaevolution");
    }

    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
        String topic = "schema_evolution";
        this.topics = Collections.singletonList("schema_evolution");
        this.createTopic("schema_evolution", 1);
        this.sendMessages("schema_evolution", this.getMessages(String.format("kafka/canal/table/%s/canal-data-1.txt", sourceDir)));
        Map<String, String> pulsarConfig = this.getBasicPulsarConfig();
        pulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        if (ThreadLocalRandom.current().nextBoolean()) {
            pulsarConfig.put(PulsarActionUtils.TOPIC.key(), "schema_evolution");
        } else {
            pulsarConfig.put(PulsarActionUtils.TOPIC_PATTERN.key(), "schema_.*");
        }
        pulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        PulsarSyncTableAction action = (PulsarSyncTableAction)this.syncTableActionBuilder(pulsarConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(this.getBasicTableConfig()).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.testSchemaEvolutionImpl("schema_evolution", sourceDir);
    }

    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
        FileStoreTable table = this.getFileStoreTable(this.tableName);
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10)}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.sendMessages(topic, this.getMessages(String.format("kafka/canal/table/%s/canal-data-2.txt", sourceDir)));
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.INT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.sendMessages(topic, this.getMessages(String.format("kafka/canal/table/%s/canal-data-3.txt", sourceDir)));
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)10), DataTypes.BIGINT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.sendMessages(topic, this.getMessages(String.format("kafka/canal/table/%s/canal-data-4.txt", sourceDir)));
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)10), DataTypes.FLOAT()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        this.sendMessages(topic, this.getMessages(String.format("kafka/canal/table/%s/canal-data-5.txt", sourceDir)));
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR((int)20), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.VARBINARY((int)20), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=60L)
    public void testWaterMarkSyncTable() throws Exception {
        String topic = "watermark";
        this.topics = Collections.singletonList(topic);
        this.createTopic(topic, 1);
        this.sendMessages(topic, this.getMessages("kafka/canal/table/watermark/canal-data-1.txt"));
        Map<String, String> pulsarConfig = this.getBasicPulsarConfig();
        pulsarConfig.put(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
        pulsarConfig.put(PulsarActionUtils.TOPIC.key(), topic);
        pulsarConfig.put(PulsarActionUtils.VALUE_FORMAT.key(), "canal-json");
        Map<String, String> config = this.getBasicTableConfig();
        config.put("tag.automatic-creation", "watermark");
        config.put("tag.creation-period", "hourly");
        PulsarSyncTableAction action = (PulsarSyncTableAction)this.syncTableActionBuilder(pulsarConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").withTableConfig(config).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = (FileStoreTable)this.catalog.getTable(new Identifier(this.database, this.tableName));
        while (table.snapshotManager().snapshotCount() <= 0L || table.snapshotManager().latestSnapshot().watermark() == Long.MIN_VALUE) {
            Thread.sleep(1000L);
        }
        return;
    }
}

