/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.postgres;

import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.core.execution.JobClient;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.postgres.PostgresActionITCaseBase;
import org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableAction;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class PostgresSyncTableActionITCase
extends PostgresActionITCaseBase {
    private static final String DATABASE_NAME = "paimon_sync_table";
    private static final String SCHEMA_NAME = "public";

    @BeforeAll
    public static void startContainers() {
        POSTGRES_CONTAINER.withSetupSQL("postgres/sync_table_setup.sql");
        PostgresSyncTableActionITCase.start();
    }

    @Test
    @Timeout(value=60L)
    public void testSchemaEvolution() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "schema_evolution_\\d+");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withCatalogConfig(Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-alter-table")).withTableConfig(this.getBasicTableConfig()).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.checkTableSchema("[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"}]");
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            this.testSchemaEvolutionImpl(statement);
        }
    }

    private void checkTableSchema(String excepted) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        Assertions.assertThat((String)JsonSerdeUtil.toFlatJson((Object)table.schema().fields())).isEqualTo(excepted);
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 'one')");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 'four')");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING()}, (String[])new String[]{"pt", "_id", "v1"});
        List<String> primaryKeys = Arrays.asList("pt", "_id");
        List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), (1, 5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 'six', 60)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' WHERE _id = 2");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.INT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ALTER COLUMN v2 TYPE BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)");
        statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v2 TYPE BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 80000000000)");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BIGINT()}, (String[])new String[]{"pt", "_id", "v1", "v2"});
        expected = Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 BYTEA");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ALTER COLUMN v1 TYPE VARCHAR(20)");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 90000000000, 99999.999, 'nine.bin', 9.9)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 BYTEA");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v1 TYPE VARCHAR(20)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.BYTES(), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ALTER COLUMN v5 TYPE DOUBLE PRECISION");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 9.00000000009 WHERE _id = 9");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ALTER COLUMN v5 TYPE DOUBLE PRECISION");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 4.00000000004 WHERE _id = 4");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.BYTES(), DataTypes.DOUBLE()}, (String[])new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"});
        expected = Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        Assertions.assertThat((Map)this.getFileStoreTable().options()).containsEntry((Object)"alter-table-test", (Object)"true");
    }

    @Test
    @Timeout(value=60L)
    public void testMultipleSchemaEvolutions() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "schema_evolution_multiple");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        this.checkTableSchema("[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\"}]");
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            this.testSchemaEvolutionMultipleImpl(statement);
        }
    }

    private void testSchemaEvolutionMultipleImpl(Statement statement) throws Exception {
        FileStoreTable table = this.getFileStoreTable();
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10, 'string_1')");
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING()}, (String[])new String[]{"_id", "v1", "v2", "v3"});
        List<String> primaryKeys = Collections.singletonList("_id");
        List<String> expected = Collections.singletonList("+I[1, one, 10, string_1]");
        this.waitForResult(expected, table, rowType, primaryKeys);
        statement.executeUpdate("ALTER TABLE schema_evolution_multiple ADD COLUMN v4 INTEGER, ALTER COLUMN v1 TYPE VARCHAR(20),ADD COLUMN v5 DOUBLE PRECISION,ADD COLUMN v6 DECIMAL(5, 3),ADD COLUMN \"$% ^,& *(\" VARCHAR(10),ALTER COLUMN v2 TYPE BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')");
        rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)5, (int)3), DataTypes.STRING()}, (String[])new String[]{"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("});
        expected = Arrays.asList("+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]", "+I[2, long_string_two, 2000000000000, string_2, 20, 20.5, 20.002, test_2]");
        this.waitForResult(expected, table, rowType, primaryKeys);
    }

    @Test
    @Timeout(value=90L)
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.testAllTypesOnce();
        }
    }

    private void testAllTypesOnce() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "all_types_table");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build();
        JobClient client = this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            this.testAllTypesImpl(statement);
        }
        client.cancel().get();
    }

    private void testAllTypesImpl(Statement statement) throws Exception {
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL((int)2, (int)1).notNull(), DataTypes.BOOLEAN(), DataTypes.BINARY((int)2), DataTypes.BINARY((int)0xFFFFFFF), DataTypes.BINARY((int)8), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.SMALLINT().notNull(), DataTypes.INT().notNull(), DataTypes.BIGINT().notNull(), DataTypes.DOUBLE(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DECIMAL((int)8, (int)3), DataTypes.DECIMAL((int)8, (int)0), DataTypes.DECIMAL((int)38, (int)10), DataTypes.DATE(), DataTypes.TIMESTAMP((int)6), DataTypes.TIMESTAMP((int)6), DataTypes.TIME((int)6), DataTypes.TIME((int)6), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.ARRAY((DataType)DataTypes.STRING())}, (String[])new String[]{"_id", "pt", "_bit1", "_bit", "_bit_varying", "_bit_varying1", "_boolean", "_bool", "_smallint", "_int", "_bigint", "_small_serial", "_serial", "_big_serial", "_float", "_real", "_double_precision", "_numeric", "_decimal", "_big_decimal", "_date", "_timestamp", "_timestamp0", "_time", "_time0", "_char", "_varchar", "_text", "_bin", "_json", "_array"});
        FileStoreTable table = this.getFileStoreTable();
        String bits = Arrays.toString(new byte[]{7, -57});
        List<String> expected = Arrays.asList("+I[1, 1.1, " + String.format("true, %s, [5], [2], ", bits) + "true, true, 1000, 1000000, 10000000000, 1, 2, 3, 1.5, 1.000001, 1.000111, 12345.110, 11111, 2222222222222222300000001111.1234567890, 19439, 2023-03-23T14:30:05, 2023-03-23T00:00, 36803000, 36803000, Paimon, Apache Paimon, Apache Paimon PostgreSQL Test Data, [98, 121, 116, 101, 115], {\"a\": \"b\"}, [\"item1\", \"item2\"]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 4, 5, 6, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
    }

    @Test
    public void testIncompatibleTable() {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "incompatible_field_\\d+");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).build();
        Assertions.assertThatThrownBy(() -> ((PostgresSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Column v1 have different types when merging schemas.\nCurrent table '{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(6)\nTo be merged table 'paimon_sync_table.incompatible_field_2' field: `v1` INT")});
    }

    @Test
    public void testIncompatiblePaimonTable() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "incompatible_pk_\\d+");
        this.createFileStoreTable(RowType.of((DataType[])new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, (String[])new String[]{"a", "b", "c"}), Collections.emptyList(), Collections.singletonList("a"), new HashMap());
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPrimaryKeys("a").build();
        Assertions.assertThatThrownBy(() -> ((PostgresSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Paimon schema and source table schema are not compatible.")});
    }

    @Test
    public void testInvalidPrimaryKey() {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "schema_evolution_\\d+");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").build();
        Assertions.assertThatThrownBy(() -> ((PostgresSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Specified primary key 'pk' does not exist in source tables or computed columns [pt, _id, v1].")});
    }

    @Test
    public void testNoPrimaryKey() {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "incompatible_pk_\\d+");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).build();
        Assertions.assertThatThrownBy(() -> ((PostgresSyncTableAction)action).run()).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, (String)"Primary keys are not specified. Also, can't infer primary keys from source table schemas because source tables have no primary keys or have different primary keys.")});
    }

    @Test
    @Timeout(value=60L)
    public void testComputedColumn() throws Exception {
        for (int i = 0; i < 2; ++i) {
            this.innerTestComputedColumn(i == 0);
        }
    }

    private void innerTestComputedColumn(boolean execute) throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "test_computed_column");
        List<String> computedColumnDefs = Arrays.asList("_year_date=year(_date)", "_year_timestamp=year(_timestamp)", "_month_date=month(_date)", "_month_timestamp=month(_timestamp)", "_day_date=day(_date)", "_day_timestamp=day(_timestamp)", "_hour_date=hour(_date)", "_hour_timestamp=hour(_timestamp)", "_date_format_date=date_format(_date,yyyy)", "_date_format_timestamp=date_format(_timestamp,yyyyMMdd)", "_substring_date1=substring(_date,2)", "_substring_date2=substring(_timestamp,5,10)", "_truncate_date=trUNcate(pk,2)");
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPartitionKeys("_year_date").withPrimaryKeys("pk", "_year_date").withComputedColumnArgs(computedColumnDefs).build();
        this.runActionWithDefaultEnv((ActionBase)action);
        if (execute) {
            try (Statement statement = this.getStatement(DATABASE_NAME);){
                statement.executeUpdate("INSERT INTO test_computed_column VALUES (1, '2023-03-23', '2021-09-15 15:00:10')");
                statement.executeUpdate("INSERT INTO test_computed_column VALUES (2, '2023-03-23', null)");
            }
        }
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP((int)6), DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT().notNull()}, (String[])new String[]{"pk", "_date", "_timestamp", "_year_date", "_year_timestamp", "_month_date", "_month_timestamp", "_day_date", "_day_timestamp", "_hour_date", "_hour_timestamp", "_date_format_date", "_date_format_timestamp", "_substring_date1", "_substring_date2", "_truncate_date"});
        List<String> expected = Arrays.asList("+I[1, 19439, 2021-09-15T15:00:10, 2023, 2021, 3, 9, 23, 15, 0, 15, 2023, 20210915, 23-03-23, 09-15, 0]", "+I[2, 19439, NULL, 2023, NULL, 3, NULL, 23, NULL, 0, NULL, 2023, NULL, 23-03-23, NULL, 2]");
        this.waitForResult(expected, table, rowType, Arrays.asList("pk", "_year_date"));
    }

    @Test
    @Timeout(value=60L)
    public void testSyncShards() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        String schemaPattern = random.nextBoolean() ? "shard_.+" : "shard_1|shard_2";
        String tblPattern = random.nextBoolean() ? "t.+" : "t1|t2";
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), schemaPattern);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tblPattern);
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            statement.execute("SET search_path TO shard_1");
            statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30')");
            statement.executeUpdate("INSERT INTO t2 VALUES (2, '2023-07-30')");
            statement.execute("SET search_path TO shard_2");
            statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31')");
            statement.executeUpdate("INSERT INTO t1 VALUES (4, '2023-07-31')");
        }
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING().notNull()}, (String[])new String[]{"pk", "_date", "pt"});
        this.waitForResult(Arrays.asList("+I[1, 2023-07-30, 07-30]", "+I[2, 2023-07-30, 07-30]", "+I[3, 2023-07-31, 07-31]", "+I[4, 2023-07-31, 07-31]"), table, rowType, Arrays.asList("pk", "pt"));
    }

    @Test
    @Timeout(value=60L)
    public void testOptionsChange() throws Exception {
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), "test_options_change");
        HashMap<String, String> tableConfig = new HashMap<String, String>();
        tableConfig.put("bucket", "1");
        tableConfig.put("sink.parallelism", "1");
        PostgresSyncTableAction action1 = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(tableConfig).build();
        JobClient jobClient = this.runActionWithDefaultEnv((ActionBase)action1);
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            statement.executeUpdate("INSERT INTO test_options_change VALUES (1, '2023-03-23', '2021-09-15 15:00:10')");
            statement.executeUpdate("INSERT INTO test_options_change VALUES (2, '2023-03-23', null)");
        }
        this.waitingTables(this.tableName);
        jobClient.cancel();
        tableConfig.put("sink.savepoint.auto-tag", "true");
        tableConfig.put("tag.num-retained-max", "5");
        tableConfig.put("tag.automatic-creation", "process-time");
        tableConfig.put("tag.creation-period", "hourly");
        tableConfig.put("tag.creation-delay", "600000");
        tableConfig.put("snapshot.time-retained", "1h");
        tableConfig.put("snapshot.num-retained.min", "5");
        tableConfig.put("snapshot.num-retained.max", "10");
        tableConfig.put("changelog-producer", "input");
        PostgresSyncTableAction action2 = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(tableConfig).build();
        this.runActionWithDefaultEnv((ActionBase)action2);
        FileStoreTable table = this.getFileStoreTable();
        Assertions.assertThat((Map)table.options()).containsAllEntriesOf(tableConfig);
    }

    @Test
    @Timeout(value=60L)
    public void testMetadataColumns() throws Exception {
        String tableName = "test_metadata_columns";
        try (Statement statement = this.getStatement(DATABASE_NAME);){
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (1, '2023-07-30')");
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (2, '2023-07-30')");
        }
        Map<String, String> postgresConfig = this.getBasicPostgresConfig();
        postgresConfig.put(PostgresSourceOptions.DATABASE_NAME.key(), DATABASE_NAME);
        postgresConfig.put(PostgresSourceOptions.SCHEMA_NAME.key(), SCHEMA_NAME);
        postgresConfig.put(PostgresSourceOptions.TABLE_NAME.key(), tableName);
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(postgresConfig).withPrimaryKeys("pk").withMetadataColumns("table_name", "database_name", "schema_name").build();
        this.runActionWithDefaultEnv((ActionBase)action);
        FileStoreTable table = this.getFileStoreTable();
        RowType rowType = RowType.of((DataType[])new DataType[]{DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.STRING().notNull()}, (String[])new String[]{"pk", "_date", "table_name", "database_name", "schema_name"});
        this.waitForResult(Arrays.asList(String.format("+I[1, 2023-07-30, %s, %s, %s]", tableName, DATABASE_NAME, SCHEMA_NAME), String.format("+I[2, 2023-07-30, %s, %s, %s]", tableName, DATABASE_NAME, SCHEMA_NAME)), table, rowType, Collections.singletonList("pk"));
    }

    @Test
    public void testCatalogAndTableConfig() {
        PostgresSyncTableAction action = (PostgresSyncTableAction)this.syncTableActionBuilder(this.getBasicPostgresConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat((Map)action.catalogConfig()).containsEntry((Object)"catalog-key", (Object)"catalog-value");
        Assertions.assertThat((Map)action.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    private FileStoreTable getFileStoreTable() throws Exception {
        return this.getFileStoreTable(this.tableName);
    }
}

