/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class LogSystemITCase
extends KafkaTableTestBase {
    @BeforeEach
    public void before() throws IOException {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s')", this.getTempDirPath()));
        this.tEnv.useCatalog("PAIMON");
    }

    @Test
    public void testAppendOnlyWithEventual() throws Exception {
        LogSystemITCase.createTopicIfNotExists("T", 1);
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='i', 'log.consistency'='eventual', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T')", LogSystemITCase.getBootstrapServers()));
        this.tEnv.executeSql("CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen')");
        TableResult write = this.tEnv.executeSql("INSERT INTO T SELECT * FROM gen");
        BlockingIterator read = BlockingIterator.of((Iterator)this.tEnv.executeSql("SELECT * FROM T").collect());
        List collect = read.collect(10);
        Assertions.assertThat((List)collect).hasSize(10);
        ((JobClient)write.getJobClient().get()).cancel();
        read.close();
    }

    @Test
    public void testReadFromFile() throws Exception {
        LogSystemITCase.createTopicIfNotExists("test-double-sink", 1);
        this.env.getCheckpointConfig().setCheckpointInterval(3000L);
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE kafka_file_double_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n 'merge-engine' = 'aggregation',\n  'changelog-producer' = 'full-compaction',\n    'log.system' = 'kafka',\n    'bucket'='1',\n    'streaming-read-mode'='file',\n    'fields.cnt.aggregate-function' = 'sum',\n    'kafka.bootstrap.servers' = '%s',\n    'kafka.topic' = 'test-double-sink',\n    'kafka.transaction.timeout.ms'='30000'\n\n);", LogSystemITCase.getBootstrapServers()));
        TableResult write = this.tEnv.executeSql("INSERT INTO kafka_file_double_sink values('a',1),('b',2),('c',3);");
        BlockingIterator read = BlockingIterator.of((Iterator)this.tEnv.executeSql("SELECT * FROM kafka_file_double_sink").collect());
        Assertions.assertThat((List)read.collect(3)).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"a", 1L}), Row.of((Object[])new Object[]{"b", 2L}), Row.of((Object[])new Object[]{"c", 3L})});
        ((JobClient)write.getJobClient().get()).cancel();
        read.close();
    }

    @Test
    public void testReadFromLog() throws Exception {
        LogSystemITCase.createTopicIfNotExists("test-single-sink", 1);
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE kafka_file_single_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n 'merge-engine' = 'aggregation',\n    'changelog-producer' = 'full-compaction',\n    'log.consistency' = 'eventual',\n    'log.system' = 'kafka',\n    'bucket'='1',\n    'streaming-read-mode'='log',\n    'kafka.bootstrap.servers' = '%s',\n    'kafka.topic' = 'test-single-sink',\n    'kafka.transaction.timeout.ms'='30000'\n\n);", LogSystemITCase.getBootstrapServers()));
        this.tEnv.executeSql("CREATE TEMPORARY TABLE word_table (\n    word STRING\n) WITH (\n    'connector' = 'datagen',\n    'fields.word.length' = '1'\n);");
        TableResult write = this.tEnv.executeSql("INSERT INTO kafka_file_single_sink SELECT word, COUNT(*) FROM word_table GROUP BY word;");
        BlockingIterator read = BlockingIterator.of((Iterator)this.tEnv.executeSql("SELECT * FROM kafka_file_single_sink").collect());
        List collect = read.collect(10);
        Assertions.assertThat((List)collect).hasSize(10);
        ((JobClient)write.getJobClient().get()).cancel();
        read.close();
    }

    @Test
    public void testReadFromLogWithOutSteamingReadMode() throws Exception {
        LogSystemITCase.createTopicIfNotExists("test-single-sink", 1);
        this.env.setParallelism(1);
        this.tEnv.executeSql("CREATE TABLE kafka_file_single_sink (\n word STRING ,\n    cnt BIGINT,\n      PRIMARY KEY (word) NOT ENFORCED\n)\nWITH (\n    'merge-engine' = 'aggregation',\n    'bucket'='1',\n    'changelog-producer' = 'full-compaction',\n    'streaming-read-mode'='log'\n);");
        this.tEnv.executeSql("CREATE TEMPORARY TABLE word_table (\n    word STRING\n) WITH (\n    'connector' = 'datagen',\n    'fields.word.length' = '1'\n);");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql("INSERT INTO kafka_file_single_sink SELECT word, COUNT(*) FROM word_table GROUP BY word;")).rootCause().isInstanceOf(ValidationException.class)).hasMessage("File store continuous reading does not support the log streaming read mode.");
    }

    @Test
    @Timeout(value=60L)
    public void testLogSystemAutoRegister() throws TableNotExistException {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON_REGISTER WITH ('type'='paimon', 'warehouse'='%s', 'log.system.auto-register'='true')", this.getTempDirPath()));
        this.tEnv.useCatalog("PAIMON_REGISTER");
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='i', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='Tt')", LogSystemITCase.getBootstrapServers()));
        this.checkTopicExists("Tt", 2, 1);
        this.tEnv.executeSql(String.format("CREATE TABLE T2 (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='2', 'bucket-key'='i', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T2')", LogSystemITCase.getBootstrapServers()));
        this.checkTopicExists("T2", 2, 1);
        this.tEnv.executeSql(String.format("CREATE TABLE T1 (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='i', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s')", LogSystemITCase.getBootstrapServers()));
        CatalogBaseTable table = ((Catalog)this.tEnv.getCatalog("PAIMON_REGISTER").get()).getTable(ObjectPath.fromString((String)"default.T"));
        this.checkTopicExists((String)table.getOptions().get("kafka.topic"), 2, 1);
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='i', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T1')", LogSystemITCase.getBootstrapServers()))).isInstanceOf(ValidationException.class)).hasMessage("Could not execute CreateTable in path `PAIMON_REGISTER`.`default`.`T`").cause().isInstanceOf(TableAlreadyExistException.class)).hasMessage("Table (or view) default.T already exists in Catalog PAIMON_REGISTER.");
        this.checkTopicNotExist("T1");
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.tEnv.executeSql(String.format("CREATE TABLE NOT_EXIST.T (i INT, j INT) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='i', 'log.system.partitions'='2', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='T1')", LogSystemITCase.getBootstrapServers()))).isInstanceOf(ValidationException.class)).hasMessage("Could not execute CreateTable in path `PAIMON_REGISTER`.`NOT_EXIST`.`T`").cause().isInstanceOf(DatabaseNotExistException.class)).hasMessage("Database NOT_EXIST does not exist in Catalog PAIMON_REGISTER.");
        this.checkTopicNotExist("T1");
        this.tEnv.executeSql("DROP TABLE T");
        this.checkTopicNotExist("T");
    }

    @Test
    @Timeout(value=60L)
    public void testLogSystemAutoRegisterWithDefaultOption() {
        this.tEnv.executeSql(String.format("CREATE CATALOG PAIMON_DEFAULT WITH ('type'='paimon', 'warehouse'='%s', 'log.system.auto-register'='true', 'table-default.kafka.bootstrap.servers'='%s','table-default.log.system.partitions'='2')", this.getTempDirPath(), LogSystemITCase.getBootstrapServers()));
        this.tEnv.useCatalog("PAIMON_DEFAULT");
        this.env.getCheckpointConfig().disableCheckpointing();
        this.env.setParallelism(1);
        this.tEnv.executeSql("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'kafka.topic'='T')");
        this.checkTopicExists("T", 2, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogWriteRead() throws Exception {
        String topic = UUID.randomUUID().toString();
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='a', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", LogSystemITCase.getBootstrapServers(), topic));
            this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
            BlockingIterator iterator = BlockingIterator.of((Iterator)this.tEnv.from("T").execute().collect());
            List result = iterator.collectAndClose(2);
            Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3"}), Row.of((Object[])new Object[]{"4", "5", "6"})});
        }
        finally {
            this.deleteTopicIfExists(topic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLogWriteReadWithVirtual() throws Exception {
        String topic = UUID.randomUUID().toString();
        LogSystemITCase.createTopicIfNotExists(topic, 1);
        try {
            this.tEnv.executeSql(String.format("CREATE TABLE T (a STRING, b STRING, c STRING, d AS CAST(c as INT) + 1) WITH ('log.system'='kafka', 'bucket'='1', 'bucket-key'='a', 'kafka.bootstrap.servers'='%s','kafka.topic'='%s')", LogSystemITCase.getBootstrapServers(), topic));
            this.tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
            BlockingIterator iterator = BlockingIterator.of((Iterator)this.tEnv.from("T").execute().collect());
            List result = iterator.collectAndClose(2);
            Assertions.assertThat((List)result).containsExactlyInAnyOrder((Object[])new Row[]{Row.of((Object[])new Object[]{"1", "2", "3", 4}), Row.of((Object[])new Object[]{"4", "5", "6", 7})});
        }
        finally {
            this.deleteTopicIfExists(topic);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Timeout(value=120L)
    public void testAppendOnlyWithUnawareBucket() throws Exception {
        String topic = UUID.randomUUID().toString();
        LogSystemITCase.createTopicIfNotExists(topic, 2);
        try {
            this.env.getCheckpointConfig().disableCheckpointing();
            this.env.setParallelism(1);
            this.tEnv.executeSql(String.format("CREATE TABLE T (i INT, j INT) WITH ('log.system'='kafka', 'log.consistency'='eventual', 'bucket'='-1', 'kafka.bootstrap.servers'='%s', 'kafka.topic'='%s','kafka.batch.size'='20')", LogSystemITCase.getBootstrapServers(), topic));
            this.tEnv.executeSql("CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen', 'rows-per-second'='2')");
            TableResult write = this.tEnv.executeSql("INSERT INTO T SELECT * FROM gen");
            BlockingIterator read = BlockingIterator.of((Iterator)this.tEnv.executeSql("SELECT * FROM T").collect());
            List collect = read.collect(10);
            Assertions.assertThat((List)collect).hasSize(10);
            ((JobClient)write.getJobClient().get()).cancel();
            read.close();
            try (AdminClient adminClient = AdminClient.create((Properties)LogSystemITCase.getStandardProps());){
                HashMap<TopicPartition, OffsetSpec> topicPartitionOffsets = new HashMap<TopicPartition, OffsetSpec>(4);
                for (int i = 0; i < 2; ++i) {
                    topicPartitionOffsets.put(new TopicPartition(topic, i), OffsetSpec.latest());
                }
                Map result = (Map)adminClient.listOffsets(topicPartitionOffsets).all().get();
                Assertions.assertThat(result.values()).allMatch(partitionOffsetInfo -> partitionOffsetInfo.offset() > 0L);
            }
        }
        finally {
            this.deleteTopicIfExists(topic);
        }
    }
}

