/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.StaticResultProvider;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.ResultSetImpl;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.session.SessionManagerImpl;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class SqlGatewayServiceITCase {
    @RegisterExtension
    @Order(value=1)
    static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).build());
    @RegisterExtension
    @Order(value=2)
    static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(() -> ((MiniClusterExtension)MINI_CLUSTER).getClientConfiguration());
    @RegisterExtension
    @Order(value=3)
    static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("SqlGatewayService Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE)));
    private static SessionManagerImpl sessionManager;
    private static SqlGatewayServiceImpl service;
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).build();

    @BeforeAll
    static void setUp() {
        sessionManager = (SessionManagerImpl)SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
        service = (SqlGatewayServiceImpl)SQL_GATEWAY_SERVICE_EXTENSION.getService();
    }

    @Test
    void testOpenSessionWithConfig() {
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("key1", "val1");
        options.put("key2", "val2");
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).addSessionConfig(options).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Map actualConfig = service.getSessionConfig(sessionHandle);
        Assertions.assertThat((Map)actualConfig).containsAllEntriesOf(options);
    }

    @Test
    void testOpenSessionWithEnvironment() {
        String catalogName = "default";
        String databaseName = "testDb";
        String moduleName = "testModule";
        GenericInMemoryCatalog defaultCatalog = new GenericInMemoryCatalog(catalogName, databaseName);
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog(catalogName, (Catalog)defaultCatalog).registerModuleAtHead(moduleName, (Module)new TestModule()).setDefaultCatalog(catalogName).build();
        SessionHandle sessionHandle = service.openSession(environment);
        TableEnvironmentInternal tableEnv = service.getSession(sessionHandle).createExecutor(new Configuration()).getTableEnvironment();
        Assertions.assertThat((String)tableEnv.getCurrentCatalog()).isEqualTo(catalogName);
        Assertions.assertThat((String)tableEnv.getCurrentDatabase()).isEqualTo(databaseName);
        Assertions.assertThat((Object[])tableEnv.listModules()).contains((Object[])new String[]{moduleName});
    }

    @Test
    void testConfigureSessionWithLegalStatement(@TempDir Path tmpDir) throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        service.configureSession(sessionHandle, "SET 'key1' = 'value1';", 0L);
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("key1", "value1");
        Assertions.assertThat((Map)service.getSessionConfig(sessionHandle)).containsAllEntriesOf(config);
        service.configureSession(sessionHandle, "RESET 'key1';", 0L);
        Assertions.assertThat((Map)service.getSessionConfig(sessionHandle)).doesNotContainEntry((Object)"key1", (Object)"value1");
        service.configureSession(sessionHandle, "CREATE CATALOG mycat with ('type' = 'generic_in_memory', 'default-database' = 'db');", 0L);
        service.configureSession(sessionHandle, "USE CATALOG mycat;", 0L);
        Assertions.assertThat((String)service.getCurrentCatalog(sessionHandle)).isEqualTo("mycat");
        service.configureSession(sessionHandle, "CREATE TABLE db.tbl (score INT) WITH ('connector' = 'datagen');", 0L);
        HashSet<CatalogBaseTable.TableKind> tableKinds = new HashSet<CatalogBaseTable.TableKind>();
        tableKinds.add(CatalogBaseTable.TableKind.TABLE);
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "mycat", "db", tableKinds)).contains((Object[])new TableInfo[]{new TableInfo(ObjectIdentifier.of((String)"mycat", (String)"db", (String)"tbl"), CatalogBaseTable.TableKind.TABLE)});
        service.configureSession(sessionHandle, "ALTER TABLE db.tbl RENAME TO tbl1;", 0L);
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)service.listTables(sessionHandle, "mycat", "db", tableKinds)).doesNotContain((Object[])new TableInfo[]{new TableInfo(ObjectIdentifier.of((String)"mycat", (String)"db", (String)"tbl"), CatalogBaseTable.TableKind.TABLE)})).contains((Object[])new TableInfo[]{new TableInfo(ObjectIdentifier.of((String)"mycat", (String)"db", (String)"tbl1"), CatalogBaseTable.TableKind.TABLE)});
        service.configureSession(sessionHandle, "USE CATALOG default_catalog;", 0L);
        service.configureSession(sessionHandle, "DROP CATALOG mycat;", 0L);
        Assertions.assertThat((Collection)service.listCatalogs(sessionHandle)).doesNotContain((Object[])new String[]{"mycat"});
        this.validateStatementResult(sessionHandle, "SHOW FULL MODULES", Collections.singletonList(GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"core"), true})));
        service.configureSession(sessionHandle, "UNLOAD MODULE core;", 0L);
        this.validateStatementResult(sessionHandle, "SHOW FULL MODULES", Collections.emptyList());
        service.configureSession(sessionHandle, "LOAD MODULE core;", 0L);
        this.validateStatementResult(sessionHandle, "SHOW FULL MODULES", Collections.singletonList(GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"core"), true})));
        String udfClassName = "LowerUDF" + new Random().nextInt(50);
        String jarPath = UserClassLoaderJarTestUtils.createJarFile((File)new File(tmpDir.toUri()), (String)"test-add-jar.jar", (String)udfClassName, (String)String.format("public class %s extends org.apache.flink.table.functions.ScalarFunction {\n  public String eval(String str) {\n    return str.toLowerCase();\n  }\n}\n", udfClassName)).toURI().getPath();
        service.configureSession(sessionHandle, String.format("ADD JAR '%s';", jarPath), 0L);
        this.validateStatementResult(sessionHandle, "SHOW JARS", Collections.singletonList(GenericRowData.of((Object[])new Object[]{StringData.fromString((String)jarPath)})));
    }

    @Test
    void testFetchResultsInRunning() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)SqlGatewayServiceTestUtil.fetchResults((SqlGatewayService)service, sessionHandle, operationHandle)).isEqualTo((Object)NotReadyResult.INSTANCE);
        endRunningLatch.countDown();
    }

    @Test
    void testGetOperationFinishedAndFetchResults() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.RUNNING));
        endRunningLatch.countDown();
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, sessionHandle, operationHandle);
        List expectedData = this.getDefaultResultSet().getData();
        List<RowData> actualData = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, operationHandle);
        Assertions.assertThat(actualData).isEqualTo((Object)expectedData);
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    void testCancelOperation() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        CountDownLatch endRunningLatch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            endRunningLatch.await();
        });
        startRunningLatch.await();
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.RUNNING));
        service.cancelOperation(sessionHandle, operationHandle);
        Assertions.assertThat((Object)service.getOperationInfo(sessionHandle, operationHandle)).isEqualTo((Object)new OperationInfo(OperationStatus.CANCELED));
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    void testOperationGetErrorAndFetchError() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch startRunningLatch = new CountDownLatch(1);
        String msg = "Artificial Exception.";
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
            startRunningLatch.countDown();
            throw new SqlExecutionException(msg);
        });
        startRunningLatch.await();
        CommonTestUtils.waitUtil(() -> service.getOperationInfo(sessionHandle, operationHandle).getStatus().equals((Object)OperationStatus.ERROR), (Duration)Duration.ofSeconds(10L), (String)"Failed to get expected operation status.");
        AssertionsForClassTypes.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.fetchResults((SqlGatewayService)service, sessionHandle, operationHandle)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlExecutionException.class, (String)msg)});
        service.closeOperation(sessionHandle, operationHandle);
        Assertions.assertThat((int)sessionManager.getOperationCount(sessionHandle)).isEqualTo(0);
    }

    @Test
    void testExecuteSqlWithConfig() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        String key = "username";
        String value = "Flink";
        OperationHandle operationHandle = service.executeStatement(sessionHandle, "SET", -1L, Configuration.fromMap(Collections.singletonMap(key, value)));
        List<RowData> settings = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, operationHandle);
        Assertions.assertThat(settings).contains((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{StringData.fromString((String)key), StringData.fromString((String)value)})});
    }

    @ParameterizedTest
    @CsvSource(value={"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true", "'',false"})
    void testStopJobStatementWithSavepoint(String option, boolean hasSavepoint, @InjectClusterClient RestClusterClient<?> restClusterClient, @TempDir File tmpDir) throws Exception {
        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
        configuration.set(TableConfigOptions.TABLE_DML_SYNC, (Object)false);
        File savepointDir = new File(tmpDir, "savepoints");
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir.toURI().toString());
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
        String insertSql = "INSERT INTO sink SELECT * FROM source;";
        String stopSqlTemplate = "STOP JOB '%s' %s;";
        service.executeStatement(sessionHandle, sourceDdl, -1L, configuration);
        service.executeStatement(sessionHandle, sinkDdl, -1L, configuration);
        OperationHandle insertOperationHandle = service.executeStatement(sessionHandle, insertSql, -1L, configuration);
        List<RowData> results = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, insertOperationHandle);
        Assertions.assertThat((int)results.size()).isEqualTo(1);
        String jobId = results.get(0).getString(0).toString();
        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, (JobID)JobID.fromHexString((String)jobId));
        String stopSql = String.format(stopSqlTemplate, jobId, option);
        OperationHandle stopOperationHandle = service.executeStatement(sessionHandle, stopSql, -1L, configuration);
        List<RowData> stopResults = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, stopOperationHandle);
        Assertions.assertThat((int)stopResults.size()).isEqualTo(1);
        if (hasSavepoint) {
            String savepoint = stopResults.get(0).getString(0).toString();
            Path savepointPath = Paths.get(savepoint, new String[0]);
            Assertions.assertThat((String)savepointPath.getFileName().toString()).startsWith((CharSequence)"savepoint-");
        } else {
            Assertions.assertThat((Comparable)stopResults.get(0).getString(0)).hasToString("OK");
        }
    }

    @Test
    void testGetOperationSchemaUntilOperationIsReady() throws Exception {
        this.runGetOperationSchemaUntilOperationIsReadyOrError(this::getDefaultResultSet, (org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception>)((org.apache.flink.util.function.ThrowingConsumer)task -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat(task.get()).isEqualTo((Object)this.getDefaultResultSet().getResultSchema());
        }));
    }

    @Test
    void testShowJobsOperation(@InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
        String pipelineName = "test-job";
        configuration.set(PipelineOptions.NAME, (Object)pipelineName);
        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
        String insertSql = "INSERT INTO sink SELECT * FROM source;";
        service.executeStatement(sessionHandle, sourceDdl, -1L, configuration);
        service.executeStatement(sessionHandle, sinkDdl, -1L, configuration);
        long timeOpStart = System.currentTimeMillis();
        OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1L, configuration);
        String jobId = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, insertsOperationHandle).get(0).getString(0).toString();
        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, (JobID)JobID.fromHexString((String)jobId));
        long timeOpSucceed = System.currentTimeMillis();
        OperationHandle showJobsOperationHandle1 = service.executeStatement(sessionHandle, "SHOW JOBS", -1L, configuration);
        List<RowData> result = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, showJobsOperationHandle1);
        RowData jobRow = result.stream().filter(row -> jobId.equals(row.getString(0).toString())).findFirst().orElseThrow(() -> new IllegalStateException("Test job " + jobId + " not found."));
        Assertions.assertThat((Comparable)jobRow.getString(1)).hasToString(pipelineName);
        Assertions.assertThat((Comparable)jobRow.getString(2)).hasToString("RUNNING");
        Assertions.assertThat((long)jobRow.getTimestamp(3, 3).getMillisecond()).isBetween(Long.valueOf(timeOpStart), Long.valueOf(timeOpSucceed));
    }

    @Test
    void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration());
        String pipelineName = "test-describe-job";
        configuration.set(PipelineOptions.NAME, (Object)pipelineName);
        String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');";
        String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');";
        String insertSql = "INSERT INTO sink SELECT * FROM source;";
        service.executeStatement(sessionHandle, sourceDdl, -1L, configuration);
        service.executeStatement(sessionHandle, sinkDdl, -1L, configuration);
        long timeOpStart = System.currentTimeMillis();
        OperationHandle insertsOperationHandle = service.executeStatement(sessionHandle, insertSql, -1L, configuration);
        String jobId = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, insertsOperationHandle).get(0).getString(0).toString();
        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, (JobID)JobID.fromHexString((String)jobId));
        long timeOpSucceed = System.currentTimeMillis();
        OperationHandle describeJobOperationHandle = service.executeStatement(sessionHandle, String.format("DESCRIBE JOB '%s'", jobId), -1L, configuration);
        List<RowData> result = SqlGatewayServiceTestUtil.fetchAllResults((SqlGatewayService)service, sessionHandle, describeJobOperationHandle);
        RowData jobRow = result.stream().filter(row -> jobId.equals(row.getString(0).toString())).findFirst().orElseThrow(() -> new IllegalStateException("Test job " + jobId + " not found."));
        Assertions.assertThat((Comparable)jobRow.getString(1)).hasToString(pipelineName);
        Assertions.assertThat((Comparable)jobRow.getString(2)).hasToString("RUNNING");
        Assertions.assertThat((long)jobRow.getTimestamp(3, 3).getMillisecond()).isBetween(Long.valueOf(timeOpStart), Long.valueOf(timeOpSucceed));
    }

    @Test
    void testGetCurrentCatalog() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).setDefaultCatalog("cat2").build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((String)service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
    }

    @Test
    void testListCatalogs() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((Collection)service.listCatalogs(sessionHandle)).contains((Object[])new String[]{"cat1", "cat2"});
    }

    @Test
    void testListDatabases() throws Exception {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat", (Catalog)new GenericInMemoryCatalog("cat")).setDefaultCatalog("cat").build();
        SessionHandle sessionHandle = service.openSession(environment);
        Configuration configuration = Configuration.fromMap((Map)service.getSessionConfig(sessionHandle));
        service.executeStatement(sessionHandle, "CREATE DATABASE db1", -1L, configuration);
        OperationHandle operationHandle = service.executeStatement(sessionHandle, "CREATE DATABASE db2", -1L, configuration);
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, sessionHandle, operationHandle);
        Assertions.assertThat((Collection)service.listDatabases(sessionHandle, "cat")).contains((Object[])new String[]{"db1", "db2"});
    }

    @Test
    void testListTables() {
        SessionHandle sessionHandle = SqlGatewayServiceTestUtil.createInitializedSession((SqlGatewayService)service);
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat1", "db1", new HashSet<CatalogBaseTable.TableKind>(Arrays.asList(CatalogBaseTable.TableKind.TABLE, CatalogBaseTable.TableKind.VIEW)))).containsExactlyInAnyOrder((Object[])new TableInfo[]{new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl1"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl2"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl3"), CatalogBaseTable.TableKind.VIEW), new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl4"), CatalogBaseTable.TableKind.VIEW)});
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat1", "db2", Collections.singleton(CatalogBaseTable.TableKind.TABLE))).containsExactly((Object[])new TableInfo[]{new TableInfo(ObjectIdentifier.of((String)"cat1", (String)"db2", (String)"tbl1"), CatalogBaseTable.TableKind.TABLE)});
        Assertions.assertThat((Collection)service.listTables(sessionHandle, "cat2", "db0", Collections.singleton(CatalogBaseTable.TableKind.VIEW))).isEmpty();
    }

    @Test
    void testListSystemFunctions() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        Assertions.assertThat((Collection)service.listSystemFunctions(sessionHandle)).contains((Object[])new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of((String)"sin"), FunctionKind.SCALAR), new FunctionInfo(FunctionIdentifier.of((String)"sum"), FunctionKind.AGGREGATE), new FunctionInfo(FunctionIdentifier.of((String)"as"), FunctionKind.OTHER)});
    }

    @Test
    void testListUserDefinedFunctions() {
        SessionEnvironment environment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).registerCatalog("cat1", (Catalog)new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", (Catalog)new GenericInMemoryCatalog("cat2")).build();
        SessionHandle sessionHandle = service.openSession(environment);
        TableEnvironmentInternal tEnv = service.getSession(sessionHandle).createExecutor().getTableEnvironment();
        tEnv.createTemporarySystemFunction("count_distinct", JavaUserDefinedAggFunctions.CountDistinct.class);
        tEnv.createFunction("java1", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        tEnv.createTemporaryFunction("table_func0", TableFunc0.class);
        tEnv.createFunction("cat1.default.filter_out_function", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        Assertions.assertThat((Collection)service.listUserDefinedFunctions(sessionHandle, "default_catalog", "default_database")).contains((Object[])new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of((String)"count_distinct")), new FunctionInfo(FunctionIdentifier.of((ObjectIdentifier)ObjectIdentifier.of((String)"default_catalog", (String)"default_database", (String)"java1"))), new FunctionInfo(FunctionIdentifier.of((ObjectIdentifier)ObjectIdentifier.of((String)"default_catalog", (String)"default_database", (String)"table_func0")))});
    }

    @Test
    void testCompleteStatement() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        String createTable1 = "CREATE TABLE Table1 (\n  IntegerField1 INT,\n  StringField1 STRING,\n  TimestampField1 TIMESTAMP(3)\n) WITH (\n  'connector' = 'datagen'\n)\n";
        String createTable2 = "CREATE TABLE Table2 (\n  BooleanField BOOLEAN,\n  StringField2 STRING,\n  TimestampField2 TIMESTAMP\n) WITH (\n  'connector' = 'blackhole'\n)\n";
        service.getSession(sessionHandle).createExecutor().getTableEnvironment().executeSql(createTable1);
        service.getSession(sessionHandle).createExecutor().getTableEnvironment().executeSql(createTable2);
        this.validateCompletionHints(sessionHandle, "SELECT * FROM Ta", Arrays.asList("default_catalog.default_database.Table1", "default_catalog.default_database.Table2"));
        this.validateCompletionHints(sessionHandle, "SELECT * FROM Table1 WH", Collections.singletonList("WHERE"));
        this.validateCompletionHints(sessionHandle, "SELECT * FROM Table1 WHERE Inte", Collections.singletonList("IntegerField1"));
    }

    @Test
    void testGetTable() {
        SessionHandle sessionHandle = SqlGatewayServiceTestUtil.createInitializedSession((SqlGatewayService)service);
        ResolvedCatalogTable actualTable = (ResolvedCatalogTable)service.getTable(sessionHandle, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl1"));
        Assertions.assertThat((Object)actualTable.getResolvedSchema()).isEqualTo((Object)ResolvedSchema.of((Column[])new Column[0]));
        Assertions.assertThat((Map)actualTable.getOptions()).isEqualTo(Collections.singletonMap("connector", "values"));
        ResolvedCatalogView actualView = (ResolvedCatalogView)service.getTable(sessionHandle, ObjectIdentifier.of((String)"cat1", (String)"db1", (String)"tbl3"));
        Assertions.assertThat((String)actualView.getOriginalQuery()).isEqualTo("SELECT 1");
    }

    @Test
    void testCancelOperationAndFetchResultInParallel() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch latch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, latch::await);
        this.runCancelOrCloseOperationWhenFetchResults(sessionHandle, operationHandle, () -> service.cancelOperation(sessionHandle, operationHandle), (Condition<String>)new Condition(msg -> msg.contains(String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CANCELED)), "Fetch results with expected error message.", new Object[0]));
        latch.countDown();
    }

    @Test
    void testCloseOperationAndFetchResultInParallel() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> Thread.sleep(1L));
        this.runCancelOrCloseOperationWhenFetchResults(sessionHandle, operationHandle, () -> service.closeOperation(sessionHandle, operationHandle), (Condition<String>)new Condition(msg -> msg.contains(String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle)) || msg.contains(String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CLOSED)), "Fetch results with expected error message.", new Object[0]));
    }

    @Test
    void testCancelAndCloseOperationInParallel() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        int operationNum = 200;
        ArrayList<OperationManager.Operation> operations = new ArrayList<OperationManager.Operation>(operationNum);
        for (int i = 0; i < operationNum; ++i) {
            boolean throwError = i % 2 == 0;
            OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, () -> {
                Thread.sleep(100L);
                if (throwError) {
                    throw new SqlGatewayException("Artificial Exception.");
                }
            });
            operations.add(service.getSession(sessionHandle).getOperationManager().getOperation(operationHandle));
            ExecutorService executor = EXECUTOR_EXTENSION.getExecutor();
            executor.submit(() -> service.cancelOperation(sessionHandle, operationHandle));
            executor.submit(() -> service.closeOperation(sessionHandle, operationHandle));
        }
        CommonTestUtils.waitUtil(() -> service.getSession(sessionHandle).getOperationManager().getOperationCount() == 0, (Duration)Duration.ofSeconds(10L), (String)"All operations should be closed.");
        for (OperationManager.Operation op : operations) {
            Assertions.assertThat((Comparable)op.getOperationInfo().getStatus()).isEqualTo((Object)OperationStatus.CLOSED);
        }
    }

    @Test
    void testSubmitOperationAndCloseOperationManagerInParallel1() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationManager manager = service.getSession(sessionHandle).getOperationManager();
        int submitThreadsNum = 100;
        CountDownLatch latch = new CountDownLatch(submitThreadsNum);
        for (int i = 0; i < submitThreadsNum; ++i) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> {
                try {
                    this.submitDefaultOperation(sessionHandle, () -> {});
                }
                finally {
                    latch.countDown();
                }
            });
        }
        manager.close();
        latch.await();
        Assertions.assertThat((int)manager.getOperationCount()).isEqualTo(0);
    }

    @Test
    void testSubmitOperationAndCloseOperationManagerInParallel2() throws Exception {
        int count = 3;
        CountDownLatch startRunning = new CountDownLatch(1);
        CountDownLatch terminateRunning = new CountDownLatch(1);
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        for (int i = 0; i < count; ++i) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> service.submitOperation(sessionHandle, () -> {
                startRunning.countDown();
                terminateRunning.await();
                return this.getDefaultResultSet();
            }));
        }
        startRunning.await();
        service.getSession(sessionHandle).getOperationManager().close();
        terminateRunning.countDown();
    }

    @Test
    void testExecuteOperationInSequence() throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        AtomicReference<Integer> v = new AtomicReference<Integer>(0);
        int threadNum = 100;
        ArrayList<OperationHandle> handles = new ArrayList<OperationHandle>();
        for (int i = 0; i < threadNum; ++i) {
            handles.add(service.submitOperation(sessionHandle, () -> {
                int origin = (Integer)v.get();
                v.set(origin + 1);
                return this.getDefaultResultSet();
            }));
        }
        for (OperationHandle handle : handles) {
            SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, sessionHandle, handle);
        }
        Assertions.assertThat((Integer)v.get()).isEqualTo(threadNum);
    }

    @Test
    void testReleaseLockWhenFailedToSubmitOperation() throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        int maximumThreads = 500;
        ArrayList<SessionHandle> sessions = new ArrayList<SessionHandle>();
        ArrayList<OperationHandle> operations = new ArrayList<OperationHandle>();
        for (int i = 0; i < maximumThreads; ++i) {
            SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
            sessions.add(sessionHandle);
            operations.add(service.submitOperation(sessionHandle, () -> {
                latch.await();
                return this.getDefaultResultSet();
            }));
        }
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        AssertionsForClassTypes.assertThatThrownBy(() -> service.submitOperation(sessionHandle, () -> {
            latch.await();
            return this.getDefaultResultSet();
        })).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RejectedExecutionException.class)});
        latch.countDown();
        SqlGatewayServiceTestUtil.awaitOperationTermination((SqlGatewayService)service, (SessionHandle)sessions.get(0), (OperationHandle)operations.get(0));
        CountDownLatch success = new CountDownLatch(1);
        service.submitOperation(sessionHandle, () -> {
            success.countDown();
            return this.getDefaultResultSet();
        });
        CommonTestUtils.waitUtil(() -> success.getCount() == 0L, (Duration)Duration.ofSeconds(10L), (String)"Should come to end.");
    }

    @Test
    void testConfigureSessionWithIllegalStatement() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        AssertionsForClassTypes.assertThatThrownBy(() -> service.configureSession(sessionHandle, "SELECT 1;", 0L)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(UnsupportedOperationException.class, (String)"Unsupported statement for configuring session:SELECT 1;\nThe configureSession API only supports to execute statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR.")});
    }

    @Test
    void testFetchResultsFromCanceledOperation() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch latch = new CountDownLatch(1);
        OperationHandle operationHandle = this.submitDefaultOperation(sessionHandle, latch::await);
        service.cancelOperation(sessionHandle, operationHandle);
        AssertionsForClassTypes.assertThatThrownBy(() -> SqlGatewayServiceTestUtil.fetchResults((SqlGatewayService)service, sessionHandle, operationHandle)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)String.format("Can not fetch results from the %s in %s status.", operationHandle, OperationStatus.CANCELED))});
        latch.countDown();
    }

    @Test
    void testRequestNonExistOperation() {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        OperationHandle operationHandle = OperationHandle.create();
        List<RunnableWithException> requests = Arrays.asList(() -> service.cancelOperation(sessionHandle, operationHandle), () -> service.getOperationInfo(sessionHandle, operationHandle), () -> SqlGatewayServiceTestUtil.fetchResults((SqlGatewayService)service, sessionHandle, operationHandle));
        for (RunnableWithException request : requests) {
            AssertionsForClassTypes.assertThatThrownBy(() -> ((RunnableWithException)request).run()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle))});
        }
    }

    @Test
    void testGetOperationSchemaWhenOperationGetError() throws Exception {
        String msg = "Artificial Exception.";
        this.runGetOperationSchemaUntilOperationIsReadyOrError(() -> {
            throw new SqlGatewayException(msg);
        }, (org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception>)((org.apache.flink.util.function.ThrowingConsumer)task -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)AssertionsForClassTypes.assertThatThrownBy(task::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlGatewayException.class, (String)msg)});
        }));
    }

    private OperationHandle submitDefaultOperation(SessionHandle sessionHandle, RunnableWithException executor) {
        return service.submitOperation(sessionHandle, () -> {
            executor.run();
            return this.getDefaultResultSet();
        });
    }

    private ResultSet getDefaultResultSet() {
        List<RowData> data = Arrays.asList(GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{1L, StringData.fromString((String)"Flink CDC"), 3}), GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{2L, StringData.fromString((String)"MySql"), null}), GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{1, null, null}), GenericRowData.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{2, null, 101}));
        return new ResultSetImpl(ResultSet.ResultType.PAYLOAD, null, ResolvedSchema.of((Column[])new Column[]{Column.physical((String)"id", (DataType)DataTypes.BIGINT()), Column.physical((String)"name", (DataType)DataTypes.STRING()), Column.physical((String)"age", (DataType)DataTypes.INT())}), data, StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER, false, null, ResultKind.SUCCESS_WITH_CONTENT);
    }

    private void runGetOperationSchemaUntilOperationIsReadyOrError(Callable<ResultSet> executor, org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> validator) throws Exception {
        SessionHandle sessionHandle = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch operationIsRunning = new CountDownLatch(1);
        CountDownLatch schemaFetcherIsRunning = new CountDownLatch(1);
        OperationHandle operationHandle = service.submitOperation(sessionHandle, () -> {
            operationIsRunning.await();
            return (ResultSet)executor.call();
        });
        FutureTask<ResolvedSchema> task = new FutureTask<ResolvedSchema>(() -> {
            schemaFetcherIsRunning.countDown();
            return service.getOperationResultSchema(sessionHandle, operationHandle);
        });
        EXECUTOR_EXTENSION.getExecutor().submit(task);
        schemaFetcherIsRunning.await();
        operationIsRunning.countDown();
        validator.accept(task);
    }

    private void runCancelOrCloseOperationWhenFetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, RunnableWithException cancelOrClose, Condition<String> condition) {
        ArrayList actual = new ArrayList();
        EXECUTOR_EXTENSION.getExecutor().submit(() -> {
            try {
                cancelOrClose.run();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            Long token = 0L;
            while (token != null) {
                ResultSet resultSet = service.fetchResults(sessionHandle, operationHandle, token.longValue(), Integer.MAX_VALUE);
                if (resultSet.getNextToken() != null) {
                    token = resultSet.getNextToken();
                }
                if (resultSet.getResultType() != ResultSet.ResultType.PAYLOAD) continue;
                actual.addAll(resultSet.getData());
            }
        }).satisfies(new ThrowingConsumer[]{t -> {
            ListAssert cfr_ignored_0 = (ListAssert)FlinkAssertions.assertThatChainOfCauses((Throwable)t).anySatisfy(t1 -> condition.matches((Object)t1.getMessage()));
        }});
        Assertions.assertThat((List)this.getDefaultResultSet().getData()).containsAll(actual);
    }

    private void validateStatementResult(SessionHandle sessionHandle, String statement, List<RowData> expected) {
        TableEnvironmentInternal tableEnv = service.getSession(sessionHandle).createExecutor().getTableEnvironment();
        Assertions.assertThat((List)CollectionUtil.iteratorToList((Iterator)((TableResultInternal)tableEnv.executeSql(statement)).collectInternal())).isEqualTo(expected);
    }

    private void validateCompletionHints(SessionHandle sessionHandle, String incompleteSql, List<String> expectedCompletionHints) {
        Assertions.assertThat((List)service.completeStatement(sessionHandle, incompleteSql, incompleteSql.length())).isEqualTo(expectedCompletionHints);
    }
}

