/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public abstract class AbstractMaterializedTableStatementITCase {
    private static final String FILE_CATALOG_STORE = "file_store";
    private static final String TEST_CATALOG_PREFIX = "test_catalog";
    protected static final String TEST_DEFAULT_DATABASE = "test_db";
    private static final AtomicLong COUNTER = new AtomicLong(0L);
    @RegisterExtension
    @Order(value=1)
    static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).build());
    @RegisterExtension
    @Order(value=2)
    protected static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(() -> ((MiniClusterExtension)MINI_CLUSTER).getClientConfiguration());
    @RegisterExtension
    @Order(value=3)
    protected static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("SqlGatewayService Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE)));
    @RegisterExtension
    @Order(value=4)
    protected static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
    protected static SqlGatewayServiceImpl service;
    private static SessionEnvironment defaultSessionEnvironment;
    private static Path baseCatalogPath;
    private String fileSystemCatalogPath;
    protected String fileSystemCatalogName;
    protected SessionHandle sessionHandle;
    protected RestClusterClient<?> restClusterClient;

    @BeforeAll
    static void setUp(@TempDir Path temporaryFolder) throws Exception {
        service = (SqlGatewayServiceImpl)SQL_GATEWAY_SERVICE_EXTENSION.getService();
        Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE);
        Files.createDirectory(fileCatalogStore, new FileAttribute[0]);
        HashMap<String, String> catalogStoreOptions = new HashMap<String, String>();
        catalogStoreOptions.put(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND.key(), "file");
        catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString());
        baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX);
        Files.createDirectory(baseCatalogPath, new FileAttribute[0]);
        HashMap<String, String> workflowSchedulerConfig = new HashMap<String, String>();
        workflowSchedulerConfig.put(FactoryUtil.WORKFLOW_SCHEDULER_TYPE.key(), "embedded");
        workflowSchedulerConfig.put("sql-gateway.endpoint.rest.address", SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress());
        workflowSchedulerConfig.put("sql-gateway.endpoint.rest.port", String.valueOf(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort()));
        HashMap<String, String> testConf = new HashMap<String, String>();
        testConf.put("k1", "v1");
        testConf.put("k2", "v2");
        defaultSessionEnvironment = SessionEnvironment.newBuilder().addSessionConfig(catalogStoreOptions).addSessionConfig(workflowSchedulerConfig).addSessionConfig(testConf).setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).build();
    }

    @BeforeEach
    void before(@InjectClusterClient RestClusterClient<?> injectClusterClient) throws Exception {
        String randomStr = String.valueOf(COUNTER.incrementAndGet());
        Path fileCatalogPath = baseCatalogPath.resolve(randomStr);
        Files.createDirectory(fileCatalogPath, new FileAttribute[0]);
        Path dbPath = fileCatalogPath.resolve(TEST_DEFAULT_DATABASE);
        Files.createDirectory(dbPath, new FileAttribute[0]);
        this.fileSystemCatalogPath = fileCatalogPath.toString();
        this.fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr;
        this.sessionHandle = this.initializeSession();
        this.restClusterClient = injectClusterClient;
    }

    @AfterEach
    void after() throws Exception {
        Set tableInfos = service.listTables(this.sessionHandle, this.fileSystemCatalogName, TEST_DEFAULT_DATABASE, Collections.singleton(CatalogBaseTable.TableKind.TABLE));
        for (TableInfo tableInfo : tableInfos) {
            ResolvedCatalogBaseTable resolvedTable = service.getTable(this.sessionHandle, tableInfo.getIdentifier());
            if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE != resolvedTable.getTableKind()) continue;
            String dropTableDDL = String.format("DROP MATERIALIZED TABLE %s", tableInfo.getIdentifier().asSerializableString());
            OperationHandle dropTableHandle = service.executeStatement(this.sessionHandle, dropTableDDL, -1L, new Configuration());
            SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, dropTableHandle);
        }
    }

    private SessionHandle initializeSession() {
        SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment);
        String catalogDDL = String.format("CREATE CATALOG %s\nWITH (\n  'type' = 'test-filesystem',\n  'path' = '%s',\n  'default-database' = '%s'\n  )", this.fileSystemCatalogName, this.fileSystemCatalogPath, TEST_DEFAULT_DATABASE);
        service.configureSession(sessionHandle, catalogDDL, -1L);
        service.configureSession(sessionHandle, String.format("USE CATALOG %s", this.fileSystemCatalogName), -1L);
        String dataGenSource = "CREATE TABLE datagenSource (\n  order_id BIGINT,\n  order_number VARCHAR(20),\n  user_id BIGINT,\n  shop_id BIGINT,\n  product_id BIGINT,\n  status BIGINT,\n  order_type BIGINT,\n  order_created_at TIMESTAMP,\n  payment_amount_cents BIGINT\n)\nWITH (\n  'connector' = 'datagen',\n  'rows-per-second' = '10'\n)";
        service.configureSession(sessionHandle, dataGenSource, -1L);
        return sessionHandle;
    }

    public void createAndVerifyCreateMaterializedTableWithData(String materializedTableName, List<Row> data, Map<String, String> partitionFormatter, CatalogMaterializedTable.RefreshMode refreshMode) throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        String dataId = TestValuesTableFactory.registerData(data);
        String sourceDdl = String.format("CREATE TABLE IF NOT EXISTS my_source (\n  order_id BIGINT,\n  user_id BIGINT,\n  shop_id BIGINT,\n  order_created_at STRING\n)\nWITH (\n  'connector' = 'values',\n  'bounded' = 'true',\n  'data-id' = '%s'\n)", dataId);
        OperationHandle sourceHandle = service.executeStatement(this.sessionHandle, sourceDdl, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, sourceHandle);
        String partitionFields = partitionFormatter != null && !partitionFormatter.isEmpty() ? partitionFormatter.entrySet().stream().map(e -> String.format("'partition.fields.%s.date-formatter' = '%s'", e.getKey(), e.getValue())).collect(Collectors.joining(",\n", "", ",\n")) : "\n";
        String materializedTableDDL = String.format("CREATE MATERIALIZED TABLE %s PARTITIONED BY (ds)\n WITH(\n    %s   'format' = 'debezium-json'\n )\n FRESHNESS = INTERVAL '30' SECOND\n REFRESH_MODE = %s\n AS SELECT \n  user_id,\n  shop_id,\n  ds,\n  COUNT(order_id) AS order_cnt\n FROM (\n    SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source ) AS tmp\n GROUP BY (user_id, shop_id, ds)", materializedTableName, partitionFields, refreshMode.toString());
        OperationHandle materializedTableHandle = service.executeStatement(this.sessionHandle, materializedTableDDL, -1L, new Configuration());
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, this.sessionHandle, materializedTableHandle);
        CommonTestUtils.waitUtil(() -> this.fetchTableData(this.sessionHandle, String.format("SELECT * FROM %s", materializedTableName)).size() == data.size(), (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify the data in materialized table.");
    }

    public List<RowData> fetchTableData(SessionHandle sessionHandle, String query) {
        OperationHandle queryHandle = service.executeStatement(sessionHandle, query, -1L, new Configuration());
        return SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, queryHandle);
    }

    public void verifyRefreshJobCreated(RestClusterClient<?> restClusterClient, String jobId, long startTime) throws Exception {
        long timeout = Duration.ofSeconds(20L).toMillis();
        long pause = Duration.ofSeconds(2L).toMillis();
        Optional<JobStatusMessage> job = ((Collection)restClusterClient.listJobs().get(timeout, TimeUnit.MILLISECONDS)).stream().filter(j -> j.getJobId().toString().equals(jobId)).findFirst();
        Assertions.assertThat(job).isPresent();
        Assertions.assertThat((long)job.get().getStartTime()).isGreaterThan(startTime);
        JobDetailsInfo jobDetailsInfo = (JobDetailsInfo)restClusterClient.getJobDetails(JobID.fromHexString((String)jobId)).get(timeout, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Comparable)jobDetailsInfo.getJobType()).isEqualTo((Object)JobType.BATCH);
        CommonTestUtils.waitUtil(() -> {
            try {
                return JobStatus.FINISHED.equals(restClusterClient.getJobStatus(JobID.fromHexString((String)jobId)).get(5L, TimeUnit.SECONDS));
            }
            catch (Exception exception) {
                return false;
            }
        }, (Duration)Duration.ofMillis(timeout), (Duration)Duration.ofMillis(pause), (String)"Failed to verify whether the job is finished.");
    }
}

