/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.gateway.AbstractMaterializedTableStatementITCase;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil;
import org.apache.flink.table.gateway.workflow.EmbeddedRefreshHandler;
import org.apache.flink.table.gateway.workflow.EmbeddedRefreshHandlerSerializer;
import org.apache.flink.table.gateway.workflow.WorkflowInfo;
import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler;
import org.apache.flink.table.gateway.workflow.scheduler.QuartzSchedulerUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.refresh.ContinuousRefreshHandler;
import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.types.Row;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Trigger;
import org.quartz.TriggerKey;

public class MaterializedTableStatementITCase
extends AbstractMaterializedTableStatementITCase {
    @Test
    void testCreateMaterializedTableInContinuousMode() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable actualMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        ResolvedSchema expectedSchema = ResolvedSchema.of(Arrays.asList(Column.physical((String)"user_id", (DataType)DataTypes.BIGINT()), Column.physical((String)"shop_id", (DataType)DataTypes.BIGINT()), Column.physical((String)"ds", (DataType)DataTypes.STRING()), Column.physical((String)"payed_buy_fee_sum", (DataType)DataTypes.BIGINT()), Column.physical((String)"pv", (DataType)((DataType)DataTypes.INT().notNull()))));
        Assertions.assertThat((Object)actualMaterializedTable.getResolvedSchema()).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((Duration)actualMaterializedTable.getFreshness()).isEqualTo((Object)Duration.ofSeconds(30L));
        Assertions.assertThat((Comparable)actualMaterializedTable.getLogicalRefreshMode()).isEqualTo((Object)CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
        Assertions.assertThat((Comparable)actualMaterializedTable.getRefreshMode()).isEqualTo((Object)CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        Assertions.assertThat((Comparable)actualMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        Assertions.assertThat((Optional)actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
        Assertions.assertThat((byte[])actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(actualMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId());
        OperationHandle describeJobHandle = service.executeStatement(this.sessionHandle, describeJobDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, describeJobHandle);
        List<RowData> jobResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, describeJobHandle);
        Assertions.assertThat((String)jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
        long checkpointInterval = this.getCheckpointIntervalConfig(this.restClusterClient, activeRefreshHandler.getJobId());
        Assertions.assertThat((long)checkpointInterval).isEqualTo(30000L);
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval() throws Exception {
        long checkpointInterval = 60000L;
        OperationHandle checkpointSetHandle = service.executeStatement(this.sessionHandle, String.format("SET '%s' = '%d'", CheckpointingOptions.CHECKPOINTING_INTERVAL.key(), checkpointInterval), -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, checkpointSetHandle);
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable actualMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(actualMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        long actualCheckpointInterval = this.getCheckpointIntervalConfig(this.restClusterClient, activeRefreshHandler.getJobId());
        Assertions.assertThat((long)actualCheckpointInterval).isEqualTo(checkpointInterval);
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testCreateMaterializedTableInFullMode() throws Exception {
        String dataId = TestValuesTableFactory.registerData(Collections.emptyList());
        String sourceDdl = String.format("CREATE TABLE IF NOT EXISTS my_source (\n  order_id BIGINT,\n  user_id BIGINT,\n  shop_id BIGINT,\n  order_created_at STRING\n)\nWITH (\n  'connector' = 'values',\n  'bounded' = 'true',\n  'data-id' = '%s'\n)", dataId);
        OperationHandle sourceHandle = service.executeStatement(this.sessionHandle, sourceDdl, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, sourceHandle);
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n    'partition.fields.ds.date-formatter' = 'yyyy-MM-dd',\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '1' MINUTE\n REFRESH_MODE = FULL\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  COUNT(order_id) AS order_cnt\n FROM (\n    SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable actualMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)actualMaterializedTable.getRefreshMode()).isEqualTo((Object)CatalogMaterializedTable.RefreshMode.FULL);
        byte[] serializedHandler = actualMaterializedTable.getSerializedRefreshHandler();
        EmbeddedRefreshHandler embeddedRefreshHandler = EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(serializedHandler, this.getClass().getClassLoader());
        Assertions.assertThat((String)embeddedRefreshHandler.getWorkflowName()).isEqualTo("quartz_job_" + ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSerializableString());
        EmbeddedQuartzScheduler embeddedWorkflowScheduler = SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayRestEndpoint().getQuartzScheduler();
        JobKey jobKey = new JobKey(embeddedRefreshHandler.getWorkflowName(), embeddedRefreshHandler.getWorkflowGroup());
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
        JobDetail jobDetail = embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
        String workflowJsonStr = jobDetail.getJobDataMap().getString("workflowInfo");
        WorkflowInfo workflowInfo = (WorkflowInfo)QuartzSchedulerUtils.fromJson((String)workflowJsonStr, WorkflowInfo.class);
        ((MapAssert)((MapAssert)((MapAssert)((MapAssert)((MapAssert)((MapAssert)((MapAssert)Assertions.assertThat((Map)workflowInfo.getInitConfig()).containsEntry((Object)"k1", (Object)"v1")).containsEntry((Object)"k2", (Object)"v2")).containsKey((Object)"sql-gateway.endpoint.rest.address")).containsKey((Object)"sql-gateway.endpoint.rest.port")).containsKey((Object)"table.catalog-store.kind")).containsKey((Object)"table.catalog-store.file.path")).doesNotContainKey((Object)FactoryUtil.WORKFLOW_SCHEDULER_TYPE.key())).doesNotContainKey((Object)TableConfigOptions.RESOURCES_DOWNLOAD_DIR.key());
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testCreateMaterializedTableFailedInInContinuousMode() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle)).cause().hasMessageContaining(String.format("Submit continuous refresh job for materialized table %s occur exception.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSerializableString()));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"))).isInstanceOf(SqlGatewayException.class)).hasMessageContaining("Failed to getTable.");
    }

    @Test
    void testAlterMaterializedTableRefresh() throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        ArrayList<Row> data = new ArrayList<Row>();
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        data.add(Row.of((Object[])new Object[]{3L, 3L, 3L, "2024-01-02"}));
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        data.remove(2);
        long currentTime = System.currentTimeMillis();
        String alterStatement = "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')";
        OperationHandle alterHandle = service.executeStatement(this.sessionHandle, alterStatement, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterHandle);
        List<RowData> result = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, alterHandle);
        Assertions.assertThat((int)result.size()).isEqualTo(1);
        String jobId = result.get(0).getString(0).toString();
        this.verifyRefreshJobCreated(this.restClusterClient, jobId, currentTime);
        CommonTestUtils.waitUtil(() -> this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table").size() == data.size(), (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify the data in materialized table.");
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-02'").size()).isEqualTo(1);
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table"));
    }

    @Test
    void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds1, ds2)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds1,\n  ds2,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds1, user_id % 10 as ds2, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds1, ds2)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        String alterStatementWithUnknownPartitionKey = "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds3 = '2024-01-01')";
        OperationHandle alterStatementWithUnknownPartitionKeyHandle = service.executeStatement(this.sessionHandle, alterStatementWithUnknownPartitionKey, -1L, new Configuration());
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterStatementWithUnknownPartitionKeyHandle)).isInstanceOf(SqlExecutionException.class)).rootCause().isInstanceOf(ValidationException.class)).hasMessage("The partition spec contains unknown partition keys:\n\nds3\n\nAll known partition keys are:\n\nds2\nds1");
        String alterStatementWithNonStringPartitionKey = "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 5)";
        OperationHandle alterStatementWithNonStringPartitionKeyHandle = service.executeStatement(this.sessionHandle, alterStatementWithNonStringPartitionKey, -1L, new Configuration());
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterStatementWithNonStringPartitionKeyHandle)).isInstanceOf(SqlExecutionException.class)).rootCause().isInstanceOf(ValidationException.class)).hasMessage("Currently, refreshing materialized table only supports referring to char, varchar and string type partition keys. All specified partition keys in partition specs with unsupported types are:\n\nds2");
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableSuspendAndResumeInContinuousMode(@TempDir Path temporaryPath) throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable activeMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)activeMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(activeMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        String savepointDir = temporaryPath.toString();
        String alterJobSavepointDDL = String.format("SET 'execution.checkpointing.savepoint-dir' = 'file://%s'", savepointDir);
        OperationHandle alterMaterializedTableSavepointHandle = service.executeStatement(this.sessionHandle, alterJobSavepointDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSavepointHandle);
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        ResolvedCatalogMaterializedTable suspendMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)suspendMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.SUSPENDED);
        byte[] refreshHandler = suspendMaterializedTable.getSerializedRefreshHandler();
        ContinuousRefreshHandler suspendRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(refreshHandler, this.getClass().getClassLoader());
        String suspendJobId = suspendRefreshHandler.getJobId();
        String describeJobDDL = String.format("DESCRIBE JOB '%s'", suspendJobId);
        OperationHandle describeJobHandle = service.executeStatement(this.sessionHandle, describeJobDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        List<RowData> jobResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, describeJobHandle);
        Assertions.assertThat((String)jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED");
        Assertions.assertThat((Optional)suspendRefreshHandler.getRestorePath()).isNotEmpty();
        String actualSavepointPath = (String)suspendRefreshHandler.getRestorePath().get();
        String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')";
        OperationHandle alterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableResumeHandle);
        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Map)resumedCatalogMaterializedTable.getOptions()).doesNotContainKey((Object)"debezium-json.ignore-parse-errors");
        Assertions.assertThat((Comparable)resumedCatalogMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(resumedCatalogMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader()).getJobId()));
        refreshHandler = resumedCatalogMaterializedTable.getSerializedRefreshHandler();
        ContinuousRefreshHandler resumeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(refreshHandler, this.getClass().getClassLoader());
        String resumeJobId = resumeRefreshHandler.getJobId();
        String describeResumeJobDDL = String.format("DESCRIBE JOB '%s'", resumeJobId);
        OperationHandle describeResumeJobHandle = service.executeStatement(this.sessionHandle, describeResumeJobDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, describeResumeJobHandle);
        jobResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, describeResumeJobHandle);
        Assertions.assertThat((String)jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
        Optional<String> actualRestorePath = this.getJobRestoreSavepointPath(this.restClusterClient, resumeJobId);
        Assertions.assertThat(actualRestorePath).isNotEmpty();
        Assertions.assertThat((String)actualRestorePath.get()).isEqualTo(actualSavepointPath);
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableWithoutSavepointDirConfiguredInContinuousMode() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable activeMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(activeMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader()).getJobId()));
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessageContaining("Savepoint directory is not configured, can't stop job with savepoint.");
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableWithRepeatedSuspendAndResumeInContinuousMode(@TempDir Path temporaryPath) throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable activeMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(activeMaterializedTable.getSerializedRefreshHandler(), this.getClass().getClassLoader()).getJobId()));
        String savepointDir = temporaryPath.toString();
        String alterJobSavepointDDL = String.format("SET 'execution.checkpointing.savepoint-dir' = 'file://%s'", savepointDir);
        OperationHandle alterMaterializedTableSavepointHandle = service.executeStatement(this.sessionHandle, alterJobSavepointDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSavepointHandle);
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        OperationHandle repeatedAlterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, repeatedAlterMaterializedTableSuspendHandle)).rootCause().isInstanceOf(SqlExecutionException.class)).hasMessageContaining(String.format("Materialized table %s continuous refresh job has been suspended", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops")));
        String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME";
        OperationHandle alterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableResumeHandle);
        OperationHandle repeatedAlterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, repeatedAlterMaterializedTableResumeHandle)).rootCause().isInstanceOf(SqlExecutionException.class)).hasMessageContaining(String.format("Materialized table %s continuous refresh job has been resumed", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops")));
    }

    @Test
    void testAlterMaterializedTableSuspendAndResumeInFullMode() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        ResolvedCatalogMaterializedTable activeMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)activeMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        ResolvedCatalogMaterializedTable suspendMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)suspendMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.SUSPENDED);
        byte[] refreshHandler = suspendMaterializedTable.getSerializedRefreshHandler();
        EmbeddedRefreshHandler suspendRefreshHandler = EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(refreshHandler, this.getClass().getClassLoader());
        String workflowName = suspendRefreshHandler.getWorkflowName();
        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
        EmbeddedQuartzScheduler embeddedWorkflowScheduler = SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayRestEndpoint().getQuartzScheduler();
        JobKey jobKey = JobKey.jobKey((String)workflowName, (String)workflowGroup);
        Trigger.TriggerState suspendTriggerState = embeddedWorkflowScheduler.getQuartzScheduler().getTriggerState(TriggerKey.triggerKey((String)workflowName, (String)workflowGroup));
        Assertions.assertThat((Comparable)suspendTriggerState).isEqualTo((Object)Trigger.TriggerState.PAUSED);
        String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')";
        OperationHandle alterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableResumeHandle);
        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)resumedCatalogMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        refreshHandler = resumedCatalogMaterializedTable.getSerializedRefreshHandler();
        EmbeddedRefreshHandler resumeRefreshHandler = EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(refreshHandler, this.getClass().getClassLoader());
        Assertions.assertThat((String)resumeRefreshHandler.getWorkflowName()).isEqualTo(workflowName);
        Assertions.assertThat((String)resumeRefreshHandler.getWorkflowGroup()).isEqualTo(workflowGroup);
        JobDetail jobDetail = embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
        Trigger.TriggerState resumedTriggerState = embeddedWorkflowScheduler.getQuartzScheduler().getTriggerState(TriggerKey.triggerKey((String)workflowName, (String)workflowGroup));
        Assertions.assertThat((Comparable)resumedTriggerState).isEqualTo((Object)Trigger.TriggerState.NORMAL);
        WorkflowInfo workflowInfo = (WorkflowInfo)QuartzSchedulerUtils.fromJson((String)((String)jobDetail.getJobDataMap().get((Object)"workflowInfo")), WorkflowInfo.class);
        Assertions.assertThat((Map)workflowInfo.getDynamicOptions()).containsEntry((Object)"debezium-json.ignore-parse-errors", (Object)"true");
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        ResolvedCatalogMaterializedTable activeMaterializedTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Comparable)activeMaterializedTable.getRefreshStatus()).isEqualTo((Object)CatalogMaterializedTable.RefreshStatus.ACTIVATED);
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        OperationHandle repeatedAlterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, repeatedAlterMaterializedTableSuspendHandle)).rootCause().isInstanceOf(SqlExecutionException.class)).hasMessageContaining(String.format("Materialized table %s refresh workflow has been suspended.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops")));
        String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')";
        OperationHandle alterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableResumeHandle);
        OperationHandle repeatedAlterMaterializedTableResumeHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableResumeDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, repeatedAlterMaterializedTableResumeHandle)).rootCause().isInstanceOf(SqlExecutionException.class)).hasMessageContaining(String.format("Materialized table %s refresh workflow has been resumed.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops")));
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        ResolvedCatalogMaterializedTable oldTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        String alterMaterializedTableAsQueryDDL = "ALTER MATERIALIZED TABLE users_shops AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  COUNT(order_id) AS order_cnt,\n  SUM(order_amount) AS order_amount_sum\n FROM (\n    SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle alterMaterializedTableAsQueryHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableAsQueryDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableAsQueryHandle);
        ResolvedCatalogMaterializedTable newTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat(this.getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())).isEqualTo(Collections.singletonList(Column.physical((String)"order_amount_sum", (DataType)DataTypes.INT())));
        Assertions.assertThat((String)newTable.getDefinitionQuery()).isEqualTo(String.format("SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\nFROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\nFROM `%s`.`test_db`.`my_source`) AS `tmp`\nGROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", this.fileSystemCatalogName));
        Assertions.assertThat((byte[])oldTable.getSerializedRefreshHandler()).isEqualTo((Object)newTable.getSerializedRefreshHandler());
        Assertions.assertThat((Object)oldTable.getDefinitionFreshness()).isEqualTo((Object)newTable.getDefinitionFreshness());
    }

    @Test
    void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        ResolvedCatalogMaterializedTable oldTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle alterMaterializedTableSuspendHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableSuspendDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableSuspendHandle);
        String alterMaterializedTableAsQueryDDL = "ALTER MATERIALIZED TABLE users_shops AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  COUNT(order_id) AS order_cnt,\n  SUM(order_amount) AS order_amount_sum\n FROM (\n    SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle alterMaterializedTableAsQueryHandle = service.executeStatement(this.sessionHandle, alterMaterializedTableAsQueryDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterMaterializedTableAsQueryHandle);
        ResolvedCatalogMaterializedTable newTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat(this.getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())).isEqualTo(Collections.singletonList(Column.physical((String)"order_amount_sum", (DataType)DataTypes.INT())));
        Assertions.assertThat((String)newTable.getDefinitionQuery()).isEqualTo(String.format("SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\nFROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\nFROM `%s`.`test_db`.`my_source`) AS `tmp`\nGROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", this.fileSystemCatalogName));
        Assertions.assertThat((byte[])oldTable.getSerializedRefreshHandler()).isEqualTo((Object)newTable.getSerializedRefreshHandler());
        Assertions.assertThat((Object)oldTable.getDefinitionFreshness()).isEqualTo((Object)newTable.getDefinitionFreshness());
    }

    @Test
    void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPath) throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops ( PRIMARY KEY (ds, user_id) not enforced) PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  coalesce(user_id, 0) as user_id,\n  shop_id,\n  coalesce(ds, '') as ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable oldTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(oldTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        String savepointDir = "file://" + String.valueOf(temporaryPath.toAbsolutePath());
        String setupSavepointDDL = "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'";
        OperationHandle setupSavepointHandle = service.executeStatement(this.sessionHandle, setupSavepointDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, setupSavepointHandle);
        String alterTableDDL = "ALTER MATERIALIZED TABLE users_shops AS SELECT \n  coalesce(user_id, 0) as user_id,\n  shop_id,\n  coalesce(ds, '') as ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle alterTableHandle = service.executeStatement(this.sessionHandle, alterTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterTableHandle);
        ResolvedCatalogMaterializedTable newTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat(this.getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())).isEqualTo(Collections.singletonList(Column.physical((String)"pv", (DataType)DataTypes.INT())));
        Assertions.assertThat((Optional)newTable.getResolvedSchema().getPrimaryKey()).isEqualTo((Object)oldTable.getResolvedSchema().getPrimaryKey());
        Assertions.assertThat((List)newTable.getResolvedSchema().getWatermarkSpecs()).isEqualTo((Object)oldTable.getResolvedSchema().getWatermarkSpecs());
        Assertions.assertThat((String)newTable.getDefinitionQuery()).isEqualTo(String.format("SELECT COALESCE(`tmp`.`user_id`, 0) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\nFROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\nFROM `%s`.`test_db`.`datagenSource`) AS `tmp`\nGROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", this.fileSystemCatalogName));
        Assertions.assertThat((byte[])oldTable.getSerializedRefreshHandler()).isNotEqualTo((Object)newTable.getSerializedRefreshHandler());
        ContinuousRefreshHandler newContinuousRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(newTable.getSerializedRefreshHandler(), Thread.currentThread().getContextClassLoader());
        Optional<String> restorePath = this.getJobRestoreSavepointPath(this.restClusterClient, newContinuousRefreshHandler.getJobId());
        Assertions.assertThat(restorePath).isEmpty();
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(@TempDir Path temporaryPath) throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogMaterializedTable oldTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(oldTable.getSerializedRefreshHandler(), this.getClass().getClassLoader());
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()));
        String savepointDir = "file://" + String.valueOf(temporaryPath.toAbsolutePath());
        String setupSavepointDDL = "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'";
        OperationHandle setupSavepointHandle = service.executeStatement(this.sessionHandle, setupSavepointDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, setupSavepointHandle);
        String suspendTableDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
        OperationHandle suspendTableHandle = service.executeStatement(this.sessionHandle, suspendTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, suspendTableHandle);
        String alterTableDDL = "ALTER MATERIALIZED TABLE users_shops AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle alterTableHandle = service.executeStatement(this.sessionHandle, alterTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, alterTableHandle);
        ResolvedCatalogMaterializedTable newTable = (ResolvedCatalogMaterializedTable)service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat(this.getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())).isEqualTo(Collections.singletonList(Column.physical((String)"pv", (DataType)DataTypes.INT())));
        Assertions.assertThat((String)newTable.getDefinitionQuery()).isEqualTo(String.format("SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\nFROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\nFROM `%s`.`test_db`.`datagenSource`) AS `tmp`\nGROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", this.fileSystemCatalogName));
        Assertions.assertThat((byte[])oldTable.getSerializedRefreshHandler()).isEqualTo((Object)newTable.getSerializedRefreshHandler());
        Assertions.assertThat((Object)oldTable.getDefinitionFreshness()).isEqualTo((Object)newTable.getDefinitionFreshness());
        ContinuousRefreshHandler newContinuousRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(newTable.getSerializedRefreshHandler(), Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((Optional)newContinuousRefreshHandler.getRestorePath()).isEmpty();
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
    }

    @Test
    void testDropMaterializedTableInContinuousMode() throws Exception {
        String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops PARTITIONED BY (ds)\n WITH(\n   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  SUM (payment_amount_cents) AS payed_buy_fee_sum,\n  SUM (1) AS pv\n FROM (\n    SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource ) AS tmp\n GROUP BY (user_id, shop_id, ds)";
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        ResolvedCatalogBaseTable activeMaterializedTable = service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"));
        Assertions.assertThat((Object)activeMaterializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
        TestUtils.waitUntilAllTasksAreRunning((RestClusterClient)this.restClusterClient, (JobID)JobID.fromHexString((String)ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(((ResolvedCatalogMaterializedTable)activeMaterializedTable).getSerializedRefreshHandler(), this.getClass().getClassLoader()).getJobId()));
        ContinuousRefreshHandler activeRefreshHandler = ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(((ResolvedCatalogMaterializedTable)activeMaterializedTable).getSerializedRefreshHandler(), this.getClass().getClassLoader());
        String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId());
        OperationHandle describeJobHandle = service.executeStatement(this.sessionHandle, describeJobDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, describeJobHandle);
        List<RowData> jobResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, describeJobHandle);
        Assertions.assertThat((String)jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
        String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
        OperationHandle dropTableUsingMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropTableUsingMaterializedTableDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropTableUsingMaterializedTableHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("Table with identifier '%s' does not exist.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSummaryString()));
        String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
        OperationHandle dropMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropMaterializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropMaterializedTableHandle);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"))).isInstanceOf(SqlGatewayException.class)).hasMessageContaining("Failed to getTable.");
        TestUtils.waitUntilJobCanceled((JobID)JobID.fromHexString((String)activeRefreshHandler.getJobId()), (ClusterClient)this.restClusterClient);
        String describeJobAfterDropDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId());
        OperationHandle describeJobAfterDropHandle = service.executeStatement(this.sessionHandle, describeJobAfterDropDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, describeJobAfterDropHandle);
        List<RowData> jobResultsAfterDrop = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, describeJobAfterDropHandle);
        Assertions.assertThat((String)jobResultsAfterDrop.get(0).getString(2).toString()).isEqualTo("CANCELED");
        String dropNonExistMaterializedTableDDL = "DROP MATERIALIZED TABLE users_shops";
        OperationHandle dropNonExistTableHandle = service.executeStatement(this.sessionHandle, dropNonExistMaterializedTableDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropNonExistTableHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("Materialized table with identifier %s does not exist.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSerializableString()));
        String dropNonExistMaterializedTableDDL2 = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
        OperationHandle dropNonExistMaterializedTableHandle2 = service.executeStatement(this.sessionHandle, dropNonExistMaterializedTableDDL2, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropNonExistMaterializedTableHandle2);
        dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS datagenSource";
        OperationHandle dropTableHandle = service.executeStatement(this.sessionHandle, dropMaterializedTableDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropTableHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("Table %s is not a materialized table, does not support materialized table related operation.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"datagenSource").asSerializableString()));
    }

    @Test
    void testDropMaterializedTableInFullMode() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        JobKey jobKey = JobKey.jobKey((String)("quartz_job_" + ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSerializableString()), (String)"default_group");
        EmbeddedQuartzScheduler embeddedWorkflowScheduler = SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayRestEndpoint().getQuartzScheduler();
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
        String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
        OperationHandle dropTableUsingMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropTableUsingMaterializedTableDDL, -1L, new Configuration());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropTableUsingMaterializedTableHandle)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("Table with identifier '%s' does not exist.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSummaryString()));
        String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
        OperationHandle dropMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropMaterializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropMaterializedTableHandle);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"))).isInstanceOf(SqlGatewayException.class)).hasMessageContaining("Failed to getTable.");
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isFalse();
    }

    @Test
    void testDropMaterializedTableWithDeletedRefreshWorkflowInFullMode() throws Exception {
        this.createAndVerifyCreateMaterializedTableWithData("users_shops", Collections.emptyList(), Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.FULL);
        JobKey jobKey = JobKey.jobKey((String)("quartz_job_" + ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops").asSerializableString()), (String)"default_group");
        EmbeddedQuartzScheduler embeddedWorkflowScheduler = SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getSqlGatewayRestEndpoint().getQuartzScheduler();
        Assertions.assertThat((boolean)embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();
        embeddedWorkflowScheduler.deleteScheduleWorkflow(jobKey.getName(), jobKey.getGroup());
        String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
        OperationHandle dropMaterializedTableHandle = service.executeStatement(this.sessionHandle, dropMaterializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropMaterializedTableHandle);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> service.getTable(this.sessionHandle, ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"users_shops"))).isInstanceOf(SqlGatewayException.class)).hasMessageContaining("Failed to getTable.");
    }

    @Test
    void testRefreshMaterializedTableWithStaticPartitionInContinuousMode() throws Exception {
        ArrayList<Row> data = new ArrayList<Row>();
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        ObjectIdentifier objectIdentifier = ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table");
        data.add(Row.of((Object[])new Object[]{3L, 3L, 3L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{4L, 4L, 4L, "2024-01-02"}));
        long startTime = System.currentTimeMillis();
        HashMap<String, String> staticPartitions = new HashMap<String, String>();
        staticPartitions.put("ds", "2024-01-02");
        OperationHandle refreshTableHandle = service.refreshMaterializedTable(this.sessionHandle, objectIdentifier.asSerializableString(), false, null, Collections.emptyMap(), staticPartitions, Collections.emptyMap());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, refreshTableHandle);
        List<RowData> result = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, refreshTableHandle);
        Assertions.assertThat((int)result.size()).isEqualTo(1);
        String jobId = result.get(0).getString(0).toString();
        this.verifyRefreshJobCreated(this.restClusterClient, jobId, startTime);
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-02'").size()).isEqualTo(this.getPartitionSize(data, "2024-01-02"));
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-01'").size()).isNotEqualTo(this.getPartitionSize(data, "2024-01-01"));
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table"));
    }

    @Test
    void testPeriodicRefreshMaterializedTableWithoutPartitionOptions() throws Exception {
        ArrayList<Row> data = new ArrayList<Row>();
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table_without_partition_options", data, Collections.emptyMap(), CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        ObjectIdentifier materializedTableWithoutFormatterIdentifier = ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table_without_partition_options");
        data.add(Row.of((Object[])new Object[]{3L, 3L, 3L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{4L, 4L, 4L, "2024-01-02"}));
        long startTime = System.currentTimeMillis();
        OperationHandle periodRefreshTableWithoutFormatterHandle = service.refreshMaterializedTable(this.sessionHandle, materializedTableWithoutFormatterIdentifier.asSerializableString(), true, "2024-01-02 00:00:00", Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, periodRefreshTableWithoutFormatterHandle);
        List<RowData> periodRefreshWithoutFormatterResult = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, periodRefreshTableWithoutFormatterHandle);
        String periodWithoutFormatterJobId = periodRefreshWithoutFormatterResult.get(0).getString(0).toString();
        this.verifyRefreshJobCreated(this.restClusterClient, periodWithoutFormatterJobId, startTime);
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table_without_partition_options where ds = '2024-01-01'").size()).isEqualTo(this.getPartitionSize(data, "2024-01-01"));
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table_without_partition_options where ds = '2024-01-02'").size()).isEqualTo(this.getPartitionSize(data, "2024-01-02"));
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table_without_partition_options"));
    }

    @Test
    void testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception {
        ArrayList<Row> data = new ArrayList<Row>();
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), CatalogMaterializedTable.RefreshMode.FULL);
        ObjectIdentifier materializedTableIdentifier = ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table");
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        data.add(Row.of((Object[])new Object[]{4L, 4L, 4L, "2024-01-02"}));
        long startTime = System.currentTimeMillis();
        OperationHandle periodRefreshTableHandle = service.refreshMaterializedTable(this.sessionHandle, materializedTableIdentifier.asSerializableString(), true, "2024-01-03 00:00:00", Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, periodRefreshTableHandle);
        List<RowData> periodRefreshResult = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, this.sessionHandle, periodRefreshTableHandle);
        Assertions.assertThat((int)periodRefreshResult.size()).isEqualTo(1);
        String periodJobId = periodRefreshResult.get(0).getString(0).toString();
        this.verifyRefreshJobCreated(this.restClusterClient, periodJobId, startTime);
        Assertions.assertThat(this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-02'")).size().isEqualTo(this.getPartitionSize(data, "2024-01-02"));
        Assertions.assertThat((int)this.fetchTableData(this.sessionHandle, "SELECT * FROM my_materialized_table where ds = '2024-01-01'").size()).isNotEqualTo(this.getPartitionSize(data, "2024-01-01"));
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table"));
    }

    @Test
    void testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws Exception {
        ArrayList<Row> data = new ArrayList<Row>();
        data.add(Row.of((Object[])new Object[]{1L, 1L, 1L, "2024-01-01"}));
        data.add(Row.of((Object[])new Object[]{2L, 2L, 2L, "2024-01-02"}));
        this.createAndVerifyCreateMaterializedTableWithData("my_materialized_table", data, Collections.singletonMap("ds", "yyyy-MM-dd"), CatalogMaterializedTable.RefreshMode.CONTINUOUS);
        ObjectIdentifier objectIdentifier = ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table");
        OperationHandle invalidRefreshTableHandle1 = service.refreshMaterializedTable(this.sessionHandle, objectIdentifier.asSerializableString(), true, null, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, invalidRefreshTableHandle1)).rootCause().isInstanceOf(ValidationException.class)).hasMessage(String.format("The scheduler time must not be null during the periodic refresh of the materialized table %s.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table").asSerializableString()));
        String invalidTime = "20240103 00:00:00.000";
        OperationHandle invalidRefreshTableHandle2 = service.refreshMaterializedTable(this.sessionHandle, objectIdentifier.asSerializableString(), true, invalidTime, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, invalidRefreshTableHandle2)).rootCause().isInstanceOf(SqlExecutionException.class)).hasMessage(String.format("Failed to parse a valid partition value for the field 'ds' in materialized table %s using the scheduler time '20240103 00:00:00.000' based on the date format 'yyyy-MM-dd HH:mm:ss'.", ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table").asSerializableString()));
        this.dropMaterializedTable(ObjectIdentifier.of((String)this.fileSystemCatalogName, (String)"test_db", (String)"my_materialized_table"));
    }

    private List<Column> getAddedColumns(ResolvedSchema newSchema, ResolvedSchema oldSchema) {
        return newSchema.getColumns().stream().filter(column -> !oldSchema.getColumns().contains(column)).collect(Collectors.toList());
    }

    private int getPartitionSize(List<Row> data, String partition) {
        return (int)data.stream().filter(row -> row.getField(3).toString().equals(partition)).count();
    }

    private long getCheckpointIntervalConfig(RestClusterClient<?> restClusterClient, String jobId) throws Exception {
        CheckpointConfigInfo checkpointConfigInfo = (CheckpointConfigInfo)MaterializedTableStatementITCase.sendJobRequest(restClusterClient, CheckpointConfigHeaders.getInstance(), EmptyRequestBody.getInstance(), jobId);
        return RestMapperUtils.getStrictObjectMapper().readTree(RestMapperUtils.getStrictObjectMapper().writeValueAsString((Object)checkpointConfigInfo)).get("interval").asLong();
    }

    private Optional<String> getJobRestoreSavepointPath(RestClusterClient<?> restClusterClient, String jobId) throws Exception {
        CheckpointingStatistics checkpointingStatistics = (CheckpointingStatistics)MaterializedTableStatementITCase.sendJobRequest(restClusterClient, CheckpointingStatisticsHeaders.getInstance(), EmptyRequestBody.getInstance(), jobId);
        CheckpointingStatistics.RestoredCheckpointStatistics restoredCheckpointStatistics = checkpointingStatistics.getLatestCheckpoints().getRestoredCheckpointStatistics();
        return restoredCheckpointStatistics != null ? Optional.ofNullable(restoredCheckpointStatistics.getExternalPath()) : Optional.empty();
    }

    private static <M extends JobMessageParameters, R extends RequestBody, P extends ResponseBody> P sendJobRequest(RestClusterClient<?> restClusterClient, MessageHeaders<R, P, M> headers, R requestBody, String jobId) throws Exception {
        JobMessageParameters jobMessageParameters = (JobMessageParameters)headers.getUnresolvedMessageParameters();
        jobMessageParameters.jobPathParameter.resolve((Object)JobID.fromHexString((String)jobId));
        return (P)((ResponseBody)restClusterClient.sendRequest(headers, (MessageParameters)jobMessageParameters, requestBody).get(5L, TimeUnit.SECONDS));
    }
}

