/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.spark.SparkReadTestBase;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class SparkSchemaEvolutionITCase
extends SparkReadTestBase {
    @Test
    public void testSetAndRemoveOption() {
        spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('xyc' 'unknown1')");
        Map<String, String> options = this.rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
        Assertions.assertThat(options).containsEntry((Object)"xyc", (Object)"unknown1");
        spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES('xyc')");
        options = this.rowsToMap(spark.sql("SELECT * FROM `t1$options`").collectAsList());
        Assertions.assertThat(options).doesNotContainKey((Object)"xyc");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 SET TBLPROPERTIES('primary-key' = 'a')")).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Alter primary key is not supported");
    }

    private Map<String, String> rowsToMap(List<Row> rows) {
        HashMap<String, String> map = new HashMap<String, String>();
        rows.forEach(r -> map.put(r.getString(0), r.getString(1)));
        return map;
    }

    @Test
    public void testAddColumn() {
        SparkSchemaEvolutionITCase.createTable("testAddColumn");
        SparkSchemaEvolutionITCase.writeTable("testAddColumn", "(1, 2L, '1')", "(5, 6L, '3')");
        List beforeAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
        Assertions.assertThat((String)beforeAdd.toString()).contains(new CharSequence[]{this.defaultShowCreateString("testAddColumn")});
        spark.sql("ALTER TABLE testAddColumn ADD COLUMN d STRING");
        List afterAdd = spark.sql("SHOW CREATE TABLE testAddColumn").collectAsList();
        Assertions.assertThat((String)afterAdd.toString()).contains(new CharSequence[]{this.showCreateString("testAddColumn", "a INT NOT NULL", "b BIGINT", "c STRING", "d STRING")});
        Assertions.assertThat((String)spark.table("testAddColumn").collectAsList().toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]");
    }

    @Test
    public void testAddNotNullColumn() {
        SparkSchemaEvolutionITCase.createTable("testAddNotNullColumn");
        List beforeAdd = spark.sql("SHOW CREATE TABLE testAddNotNullColumn").collectAsList();
        Assertions.assertThat((String)beforeAdd.toString()).contains(new CharSequence[]{this.defaultShowCreateString("testAddNotNullColumn")});
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Column d cannot specify NOT NULL in the default.testAddNotNullColumn table.")});
    }

    @Test
    public void testAddColumnPosition() {
        SparkSchemaEvolutionITCase.createTable("testAddColumnPositionFirst");
        spark.sql("ALTER TABLE testAddColumnPositionFirst ADD COLUMN d INT FIRST");
        List result = spark.sql("SHOW CREATE TABLE testAddColumnPositionFirst").collectAsList();
        Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("testAddColumnPositionFirst", "d INT", "a INT NOT NULL", "b BIGINT", "c STRING")});
        SparkSchemaEvolutionITCase.createTable("testAddColumnPositionAfter");
        spark.sql("ALTER TABLE testAddColumnPositionAfter ADD COLUMN d INT AFTER b");
        result = spark.sql("SHOW CREATE TABLE testAddColumnPositionAfter").collectAsList();
        Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("testAddColumnPositionAfter", "a INT NOT NULL", "b BIGINT", "d INT", "c STRING")});
    }

    @Test
    public void testRenameTable() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE t3 RENAME TO t4")).isInstanceOf(AnalysisException.class)).hasMessageContaining("The table or view `t3` cannot be found");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 RENAME TO t2")).isInstanceOf(AnalysisException.class)).hasMessageContaining("Cannot create table or view default.t2 because it already exists");
        spark.sql("ALTER TABLE t1 RENAME TO t3");
        List tables = spark.sql("SHOW TABLES").collectAsList();
        Assertions.assertThat(tables.stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[default,t2,false]", "[default,t3,false]"});
        List afterRename = spark.sql("SHOW CREATE TABLE t3").collectAsList();
        Assertions.assertThat((String)afterRename.toString()).contains(new CharSequence[]{this.defaultShowCreateString("t3")});
        List data = spark.sql("SELECT * FROM t3").collectAsList();
        Assertions.assertThat((String)data.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
    }

    @Test
    public void testRenameColumn() {
        SparkSchemaEvolutionITCase.createTable("testRenameColumn");
        SparkSchemaEvolutionITCase.writeTable("testRenameColumn", "(1, 2L, '1')", "(5, 6L, '3')");
        List beforeRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
        Assertions.assertThat((String)beforeRename.toString()).contains(new CharSequence[]{this.defaultShowCreateString("testRenameColumn")});
        List results = spark.table("testRenameColumn").select("a", new String[]{"c"}).collectAsList();
        Assertions.assertThat((String)results.toString()).isEqualTo("[[1,1], [5,3]]");
        spark.sql("ALTER TABLE testRenameColumn RENAME COLUMN b to bb");
        List afterRename = spark.sql("SHOW CREATE TABLE testRenameColumn").collectAsList();
        Assertions.assertThat((String)afterRename.toString()).contains(new CharSequence[]{this.showCreateString("testRenameColumn", "a INT NOT NULL", "bb BIGINT", "c STRING")});
        Dataset table = spark.table("testRenameColumn");
        results = table.select("bb", new String[]{"c"}).collectAsList();
        Assertions.assertThat((String)results.toString()).isEqualTo("[[2,1], [6,3]]");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> table.select("b", new String[]{"c"})).isInstanceOf(AnalysisException.class)).hasMessageContaining("A column or function parameter with name `b` cannot be resolved. Did you mean one of the following?");
    }

    @Test
    public void testRenamePartitionKey() {
        spark.sql("CREATE TABLE testRenamePartitionKey (\na BIGINT,\nb STRING)\nPARTITIONED BY (a)\n");
        List beforeRename = spark.sql("SHOW CREATE TABLE testRenamePartitionKey").collectAsList();
        Assertions.assertThat((String)beforeRename.toString()).contains(new CharSequence[]{this.showCreateString("testRenamePartitionKey", "a BIGINT", "b STRING")});
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE testRenamePartitionKey RENAME COLUMN a to aa")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot rename partition column: [a]")});
    }

    @Test
    public void testDropSingleColumn() {
        SparkSchemaEvolutionITCase.createTable("testDropSingleColumn");
        SparkSchemaEvolutionITCase.writeTable("testDropSingleColumn", "(1, 2L, '1')", "(5, 6L, '3')");
        List beforeDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
        Assertions.assertThat((String)beforeDrop.toString()).contains(new CharSequence[]{this.defaultShowCreateString("testDropSingleColumn")});
        spark.sql("ALTER TABLE testDropSingleColumn DROP COLUMN b");
        List afterDrop = spark.sql("SHOW CREATE TABLE testDropSingleColumn").collectAsList();
        Assertions.assertThat((String)afterDrop.toString()).contains(new CharSequence[]{this.showCreateString("testDropSingleColumn", "a INT NOT NULL", "c STRING")});
        List results = spark.table("testDropSingleColumn").collectAsList();
        Assertions.assertThat((String)results.toString()).isEqualTo("[[1,1], [5,3]]");
    }

    @Test
    public void testDropColumns() {
        SparkSchemaEvolutionITCase.createTable("testDropColumns");
        List beforeDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
        Assertions.assertThat((String)beforeDrop.toString()).contains(new CharSequence[]{this.defaultShowCreateString("testDropColumns")});
        spark.sql("ALTER TABLE testDropColumns DROP COLUMNS b,c");
        List afterDrop = spark.sql("SHOW CREATE TABLE testDropColumns").collectAsList();
        Assertions.assertThat((String)afterDrop.toString()).contains(new CharSequence[]{this.showCreateString("testDropColumns", "a INT NOT NULL")});
    }

    @Test
    public void testDropPartitionKey() {
        spark.sql("CREATE TABLE testDropPartitionKey (\na BIGINT,\nb STRING) \nPARTITIONED BY (a)");
        List beforeDrop = spark.sql("SHOW CREATE TABLE testDropPartitionKey").collectAsList();
        Assertions.assertThat((String)beforeDrop.toString()).contains(new CharSequence[]{this.showCreateString("testDropPartitionKey", "a BIGINT", "b STRING")});
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPartitionKey DROP COLUMN a")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot drop partition key or primary key: [a]")});
    }

    @Test
    public void testDropPrimaryKey() {
        spark.sql("CREATE TABLE testDropPrimaryKey (\na BIGINT,\nb STRING)\nPARTITIONED BY (a)\nTBLPROPERTIES ('primary-key' = 'a, b')");
        List beforeDrop = spark.sql("SHOW CREATE TABLE testDropPrimaryKey").collectAsList();
        Assertions.assertThat((String)beforeDrop.toString()).contains(new CharSequence[]{this.showCreateString("testDropPrimaryKey", "a BIGINT NOT NULL", "b STRING NOT NULL")});
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey DROP COLUMN b")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot drop partition key or primary key: [b]")});
    }

    @Test
    public void testRenamePrimaryKey() {
        spark.sql("CREATE TABLE test_rename_primary_key_table (\na BIGINT NOT NULL,\nb STRING)\nTBLPROPERTIES ('primary-key' = 'a')");
        spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'aaa'), (2, 'bbb')");
        spark.sql("ALTER TABLE test_rename_primary_key_table RENAME COLUMN a to a_");
        List result = spark.sql("SHOW CREATE TABLE test_rename_primary_key_table").collectAsList();
        ((AbstractStringAssert)Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("test_rename_primary_key_table", "a_ BIGINT NOT NULL", "b STRING")})).contains(new CharSequence[]{"'primary-key' = 'a_'"});
        List actual = spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream().map(Row::toString).collect(Collectors.toList());
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"[1,aaa]", "[2,bbb]"});
        spark.sql("INSERT INTO test_rename_primary_key_table VALUES(1, 'AAA'), (2, 'BBB')");
        actual = spark.sql("SELECT * FROM test_rename_primary_key_table").collectAsList().stream().map(Row::toString).collect(Collectors.toList());
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"[1,AAA]", "[2,BBB]"});
    }

    @Test
    public void testRenameBucketKey() {
        spark.sql("CREATE TABLE test_rename_bucket_key_table (\na BIGINT NOT NULL,\nb STRING)\nTBLPROPERTIES ('bucket-key' = 'a,b', 'bucket'='16')");
        spark.sql("INSERT INTO test_rename_bucket_key_table VALUES(1, 'aaa'), (2, 'bbb')");
        spark.sql("ALTER TABLE test_rename_bucket_key_table RENAME COLUMN b to b_");
        List result = spark.sql("SHOW CREATE TABLE test_rename_bucket_key_table").collectAsList();
        ((AbstractStringAssert)Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("test_rename_bucket_key_table", "a BIGINT NOT NULL", "b_ STRING")})).contains(new CharSequence[]{"'bucket-key' = 'a,b_'"});
        List actual = spark.sql("SELECT * FROM test_rename_bucket_key_table where b_ = 'bbb'").collectAsList().stream().map(Row::toString).collect(Collectors.toList());
        Assertions.assertThat(actual).containsExactlyInAnyOrder((Object[])new String[]{"[2,bbb]"});
    }

    @Test
    public void testUpdateColumnPosition() {
        SparkSchemaEvolutionITCase.createTable("tableFirst");
        spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
        List result = spark.sql("SHOW CREATE TABLE tableFirst").collectAsList();
        Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("tableFirst", "b BIGINT", "a INT NOT NULL", "c STRING")});
        SparkSchemaEvolutionITCase.createTable("tableAfter");
        spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
        result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
        Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("tableAfter", "a INT NOT NULL", "c STRING", "b BIGINT")});
        spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d DOUBLE)");
        spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
        result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
        Assertions.assertThat((String)result.toString()).contains(new CharSequence[]{this.showCreateString("tableAfter1", "a INT", "c STRING", "b BIGINT", "d DOUBLE")});
        SparkSchemaEvolutionITCase.createTable("tableFirstSelf");
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER COLUMN a FIRST")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot move itself for column a")});
        SparkSchemaEvolutionITCase.createTable("tableAfterSelf");
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER COLUMN b AFTER b")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot move itself for column b")});
        SparkSchemaEvolutionITCase.createTable("tableMissing");
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST")).hasMessageContaining("Missing field d in table paimon.default.tableMissing");
        SparkSchemaEvolutionITCase.createTable("tableMissingAfter");
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d")).hasMessageContaining("Missing field d in table paimon.default.tableMissingAfter");
    }

    @Test
    public void testAlterColumnType() {
        SparkSchemaEvolutionITCase.createTableWithNonNullColumn("testAlterColumnType");
        SparkSchemaEvolutionITCase.writeTable("testAlterColumnType", "(1, 2L, '1')", "(5, 6L, '3')");
        Assertions.assertThatThrownBy(() -> SparkSchemaEvolutionITCase.writeTable("testAlterColumnType", "(1, null, 'a')")).hasMessageContaining("Cannot write null to non-null column(b)");
        List beforeAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
        Assertions.assertThat((String)beforeAlter.toString()).contains(new CharSequence[]{this.defaultShowCreateStringWithNonNullColumn("testAlterColumnType")});
        spark.sql("ALTER TABLE testAlterColumnType ALTER COLUMN b TYPE DOUBLE");
        Assertions.assertThat((String)spark.table("testAlterColumnType").collectAsList().toString()).isEqualTo("[[1,2.0,1], [5,6.0,3]]");
        List afterAlter = spark.sql("SHOW CREATE TABLE testAlterColumnType").collectAsList();
        Assertions.assertThat((String)afterAlter.toString()).contains(new CharSequence[]{this.showCreateString("testAlterColumnType", "a INT NOT NULL", "b DOUBLE NOT NULL", "c STRING")});
        Assertions.assertThatThrownBy(() -> SparkSchemaEvolutionITCase.writeTable("testAlterColumnType", "(1, null, 'a')")).hasMessageContaining("Cannot write null to non-null column(b)");
    }

    @Test
    public void testAlterTableColumnNullability() {
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 0))).isFalse();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 1))).isFalse();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 2))).isFalse();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getField(this.schema2(), 2), 0))).isFalse();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getField(this.schema2(), 2), 1))).isTrue();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 0))).isTrue();
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 1))).isFalse();
        spark.sql("ALTER TABLE t2 ALTER COLUMN a DROP NOT NULL");
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 0))).isTrue();
        spark.sql("ALTER TABLE t2 ALTER COLUMN b DROP NOT NULL");
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 1))).isTrue();
        spark.sql("ALTER TABLE t2 ALTER COLUMN c DROP NOT NULL");
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getField(this.schema2(), 2))).isTrue();
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 DROP NOT NULL");
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getField(this.schema2(), 2), 0))).isTrue();
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 DROP NOT NULL");
        Assertions.assertThat((boolean)this.fieldIsNullable(this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 1))).isTrue();
    }

    @Test
    public void testAlterPrimaryKeyNullability() {
        spark.sql("CREATE TABLE testAlterPkNullability (\na BIGINT,\nb STRING)\nTBLPROPERTIES ('primary-key' = 'a')");
        Assertions.assertThatThrownBy(() -> spark.sql("ALTER TABLE testAlterPkNullability ALTER COLUMN a DROP NOT NULL")).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Cannot change nullability of primary key")});
    }

    @Test
    public void testAlterTableColumnComment() {
        SparkSchemaEvolutionITCase.createTable("testAlterTableColumnComment");
        Assertions.assertThat((String)this.getField(this.schema1(), 0).description()).isNull();
        spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'a new comment'");
        Assertions.assertThat((String)this.getField(this.schema1(), 0).description()).isEqualTo("a new comment");
        spark.sql("ALTER TABLE t1 ALTER COLUMN a COMMENT 'yet another comment'");
        Assertions.assertThat((String)this.getField(this.schema1(), 0).description()).isEqualTo("yet another comment");
        Assertions.assertThat((String)this.getField(this.schema2(), 2).description()).isEqualTo("comment about c");
        Assertions.assertThat((String)this.getNestedField(this.getField(this.schema2(), 2), 0).description()).isNull();
        Assertions.assertThat((String)this.getNestedField(this.getField(this.schema2(), 2), 1).description()).isEqualTo("comment about c2");
        Assertions.assertThat((String)this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 0).description()).isNull();
        Assertions.assertThat((String)this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 1).description()).isNull();
        spark.sql("ALTER TABLE t2 ALTER COLUMN c COMMENT 'yet another comment about c'");
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1 COMMENT 'a nested type'");
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'");
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'");
        spark.sql("ALTER TABLE t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'");
        Assertions.assertThat((String)this.getField(this.schema2(), 2).description()).isEqualTo("yet another comment about c");
        Assertions.assertThat((String)this.getNestedField(this.getField(this.schema2(), 2), 0).description()).isEqualTo("a nested type");
        Assertions.assertThat((String)this.getNestedField(this.getField(this.schema2(), 2), 1).description()).isEqualTo("a bigint type");
        Assertions.assertThat((String)this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 0).description()).isEqualTo("a double type");
        Assertions.assertThat((String)this.getNestedField(this.getNestedField(this.getField(this.schema2(), 2), 0), 1).description()).isEqualTo("a boolean array");
    }

    @Test
    public void testSchemaEvolution() {
        spark.sql("CREATE TABLE testSchemaEvolution(\na INT, \nb BIGINT, \nc VARCHAR(10), \nd INT, \ne INT, \nf INT) \nTBLPROPERTIES ('file.format'='avro')");
        SparkSchemaEvolutionITCase.writeTable("testSchemaEvolution", "(1, 2L, '3', 4, 5, 6)", "(7, 8L, '9', 10, 11, 12)");
        Dataset table = spark.table("testSchemaEvolution");
        Assertions.assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[1,2,3,4,5,6]", "[7,8,9,10,11,12]"});
        Assertions.assertThat(table.select("a", new String[]{"c", "e"}).collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[1,3,5]", "[7,9,11]"});
        Assertions.assertThat(table.filter("a>1").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[7,8,9,10,11,12]"});
        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN a to aa");
        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN aa TYPE bigint");
        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN c to a");
        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN b to c");
        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN d to b");
        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN b TYPE bigint");
        spark.sql("ALTER TABLE testSchemaEvolution RENAME COLUMN f to ff");
        spark.sql("ALTER TABLE testSchemaEvolution ALTER COLUMN ff TYPE float");
        SparkSchemaEvolutionITCase.writeTable("testSchemaEvolution", "(13L, 14L, '15', 16L, 17, 18.0F)", "(19L, 20L, '21', 22L, 23, 24.0F)");
        table = spark.table("testSchemaEvolution");
        Assertions.assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[1,2,3,4,5,6.0]", "[7,8,9,10,11,12.0]", "[13,14,15,16,17,18.0]", "[19,20,21,22,23,24.0]"});
        Assertions.assertThat(table.select("aa", new String[]{"b", "ff"}).collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[1,4,6.0]", "[7,10,12.0]", "[13,16,18.0]", "[19,22,24.0]"});
        Assertions.assertThat(table.select("aa", new String[]{"a", "ff"}).filter("b>10L").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[13,15,18.0]", "[19,21,24.0]"});
        spark.sql("ALTER TABLE testSchemaEvolution DROP COLUMNS aa, c, e");
        SparkSchemaEvolutionITCase.writeTable("testSchemaEvolution", "('25', 26L, 27.0F)", "('28', 29L, 30.0)");
        table = spark.table("testSchemaEvolution");
        Assertions.assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[3,4,6.0]", "[9,10,12.0]", "[15,16,18.0]", "[21,22,24.0]", "[25,26,27.0]", "[28,29,30.0]"});
        Assertions.assertThat(table.select("a", new String[]{"ff"}).filter("b>10L").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[15,18.0]", "[21,24.0]", "[25,27.0]", "[28,30.0]"});
        spark.sql("ALTER TABLE testSchemaEvolution ADD COLUMNS (d INT, c INT, e INT)");
        SparkSchemaEvolutionITCase.writeTable("testSchemaEvolution", "('31', 32L, 33.0F, 34, 35, 36)", "('37', 38L, 39.0F, 40, 41, 42)");
        table = spark.table("testSchemaEvolution");
        Assertions.assertThat(table.collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[3,4,6.0,null,null,null]", "[9,10,12.0,null,null,null]", "[15,16,18.0,null,null,null]", "[21,22,24.0,null,null,null]", "[25,26,27.0,null,null,null]", "[28,29,30.0,null,null,null]", "[31,32,33.0,34,35,36]", "[37,38,39.0,40,41,42]"});
        Assertions.assertThat(table.filter("b>10").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[15,16,18.0,null,null,null]", "[21,22,24.0,null,null,null]", "[25,26,27.0,null,null,null]", "[28,29,30.0,null,null,null]", "[31,32,33.0,34,35,36]", "[37,38,39.0,40,41,42]"});
        Assertions.assertThat(table.select("e", new String[]{"a", "ff", "d", "b"}).filter("b>10L").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[null,15,18.0,null,16]", "[null,21,24.0,null,22]", "[null,25,27.0,null,26]", "[null,28,30.0,null,29]", "[36,31,33.0,34,32]", "[42,37,39.0,40,38]"});
        Assertions.assertThat(table.select("e", new String[]{"a", "ff", "d", "b"}).filter("b>10 and e is not null").collectAsList().stream().map(Row::toString).collect(Collectors.toList())).containsExactlyInAnyOrder((Object[])new String[]{"[36,31,33.0,34,32]", "[42,37,39.0,40,38]"});
    }

    @Test
    public void testFilesTable() {
        SparkSchemaEvolutionITCase.createTable("testFilesTable");
        SparkSchemaEvolutionITCase.writeTable("testFilesTable", "(1, 2L, '3')", "(4, 5L, '6')");
        Assertions.assertThat(this.getFieldStatsList(spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList())).containsExactlyInAnyOrder((Object[])new String[]{"{a=0, b=0, c=0},{a=1, b=2, c=3},{a=4, b=5, c=6}"});
        spark.sql("ALTER TABLE testFilesTable ADD COLUMNS (d INT, e INT, f INT)");
        SparkSchemaEvolutionITCase.writeTable("testFilesTable", "(7, 8L, '9', 10, 11, 12)", "(13, 14L, '15', 16, 17, 18)");
        Assertions.assertThat(this.getFieldStatsList(spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList())).containsExactlyInAnyOrder((Object[])new String[]{"{a=0, b=0, c=0, d=2, e=2, f=2},{a=1, b=2, c=3, d=null, e=null, f=null},{a=4, b=5, c=6, d=null, e=null, f=null}", "{a=0, b=0, c=0, d=0, e=0, f=0},{a=7, b=8, c=15, d=10, e=11, f=12},{a=13, b=14, c=9, d=16, e=17, f=18}"});
        spark.sql("ALTER TABLE testFilesTable DROP COLUMNS c, e");
        SparkSchemaEvolutionITCase.writeTable("testFilesTable", "(19, 20L, 21, 22)", "(23, 24L, 25, 26)");
        Assertions.assertThat(this.getFieldStatsList(spark.sql("SELECT * FROM `testFilesTable$files`").collectAsList())).containsExactlyInAnyOrder((Object[])new String[]{"{a=0, b=0, d=2, f=2},{a=1, b=2, d=null, f=null},{a=4, b=5, d=null, f=null}", "{a=0, b=0, d=0, f=0},{a=7, b=8, d=10, f=12},{a=13, b=14, d=16, f=18}", "{a=0, b=0, d=0, f=0},{a=19, b=20, d=21, f=22},{a=23, b=24, d=25, f=26}"});
    }

    private List<String> getFieldStatsList(List<Row> fieldStatsRows) {
        return fieldStatsRows.stream().map(v -> StringUtils.join((Object[])new Object[]{v.getString(10), v.getString(11), v.getString(12)}, (String)",")).collect(Collectors.toList());
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testAddAndDropNestedColumn(String formatType) {
        String tableName = "testAddNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,[10,[apple,100]]]", "[2,[20,[banana,200]]]"});
        Assertions.assertThat(spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[banana,2]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.f3 STRING");
        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.f2.f3 BIGINT");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101, 1001), 'one')), (3, STRUCT(31, STRUCT('CHERRY', 301, 3001), 'three'))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,[11,[APPLE,101,1001],one]]", "[2,[20,[banana,200,null],null]]", "[3,[31,[CHERRY,301,3001],three]]"});
        Assertions.assertThat(spark.sql("SELECT v.f2.f2, v.f3, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[101,one,1]", "[200,null,2]", "[301,three,3]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.f2.f1");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(12, STRUCT(102, 1002), 'one')), (4, STRUCT(42, STRUCT(402, 4002), 'four'))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,[12,[102,1002],one]]", "[2,[20,[200,null],null]]", "[3,[31,[301,3001],three]]", "[4,[42,[402,4002],four]]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.f2.f1 DECIMAL(5, 2) AFTER f2");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(13, STRUCT(103, 100.03, 1003), 'one')), (5, STRUCT(53, STRUCT(503, 500.03, 5003), 'five'))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,[13,[103,100.03,1003],one]]", "[2,[20,[200,null,null],null]]", "[3,[31,[301,null,3001],three]]", "[4,[42,[402,null,4002],four]]", "[5,[53,[503,500.03,5003],five]]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testAddAndDropNestedColumnInArray(String formatType) {
        String tableName = "testAddNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), (2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,WrappedArray([apple,100], [banana,101])]", "[2,WrappedArray([cat,200], [dog,201])]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.element.f3 STRING AFTER f2");
        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.element.f1");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 'BANANA'))), (3, ARRAY(STRUCT(310, 'FLOWER')))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,WrappedArray([110,APPLE], [111,BANANA])]", "[2,WrappedArray([200,null], [201,null])]", "[3,WrappedArray([310,FLOWER])]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testAddAndDropNestedColumnInMap(String formatType) {
        String tableName = "testAddNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), (2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,apple,100]", "[2,cat,200]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN v.value.f3 STRING AFTER f2");
        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN v.value.f1");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, MAP(10, STRUCT(110, 'APPLE'), 20, STRUCT(111, 'BANANA'))), (3, MAP(10, STRUCT(310, 'FLOWER')))");
        Assertions.assertThat(spark.sql("SELECT k, v[10].f2, v[10].f3 FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,110,APPLE]", "[2,200,null]", "[3,310,FLOWER]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testRenameNestedColumn(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))");
        Assertions.assertThat(spark.sql("SELECT v.f2.f1, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[banana,2]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.f2.f1 to f100");
        Assertions.assertThat(spark.sql("SELECT v.f2.f100, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[banana,2]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testRenameNestedColumnInArray(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), (2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT v[0].f1, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[cat,2]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.element.f1 to f100");
        Assertions.assertThat(spark.sql("SELECT v[0].f100, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[cat,2]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testRenameNestedColumnInMap(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), (2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT v[10].f1, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[cat,2]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN v.value.f1 to f100");
        Assertions.assertThat(spark.sql("SELECT v[10].f100, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[apple,1]", "[cat,2]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testUpdateNestedColumnType(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, STRUCT(20, STRUCT('banana', 200)))");
        Assertions.assertThat(spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[100,1]", "[200,2]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.f2.f2 f2 BIGINT");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3, STRUCT(31, STRUCT('CHERRY', 3000000000000)))");
        Assertions.assertThat(spark.sql("SELECT v.f2.f2, k FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[101,1]", "[200,2]", "[3000000000000,3]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testUpdateNestedColumnTypeInArray(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), (2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,WrappedArray([apple,100], [banana,101])]", "[2,WrappedArray([cat,200], [dog,201])]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.element.f2 f2 BIGINT");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), STRUCT('BANANA', 111))), (3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
        Assertions.assertThat(spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,WrappedArray([APPLE,1000000000000], [BANANA,111])]", "[2,WrappedArray([cat,200], [dog,201])]", "[3,WrappedArray([FLOWER,3000000000000])]"});
    }

    @ParameterizedTest
    @ValueSource(strings={"orc", "avro", "parquet"})
    public void testUpdateNestedColumnTypeInMap(String formatType) {
        String tableName = "testRenameNestedColumnTable";
        spark.sql("CREATE TABLE paimon.default." + tableName + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: INT>>) TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 'file.format' = '" + formatType + "')");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, STRUCT('banana', 101))), (2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 201)))");
        Assertions.assertThat(spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,apple,100]", "[2,cat,200]"});
        spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN v.value.f2 f2 BIGINT");
        spark.sql("INSERT INTO paimon.default." + tableName + " VALUES (1, MAP(10, STRUCT('APPLE', 1000000000000), 20, STRUCT('BANANA', 111))), (3, MAP(10, STRUCT('FLOWER', 3000000000000)))");
        Assertions.assertThat(spark.sql("SELECT k, v[10].f1, v[10].f2 FROM paimon.default." + tableName).collectAsList().stream().map(Row::toString)).containsExactlyInAnyOrder((Object[])new String[]{"[1,APPLE,1000000000000]", "[2,cat,200]", "[3,FLOWER,3000000000000]"});
    }
}

