/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.rest;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.table.api.internal.StaticResultProvider;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlGatewayRestEndpointStatementITCase
extends AbstractSqlGatewayStatementITCase {
    private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayRestEndpointStatementITCase.class);
    @RegisterExtension
    @Order(value=3)
    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
    private static TestingRestClient restClient;
    private static final ExecuteStatementHeaders executeStatementHeaders;
    private static SessionMessageParameters sessionMessageParameters;
    private static final FetchResultsHeaders fetchResultsHeaders;
    private static final int OPERATION_WAIT_SECONDS = 100;
    private static final String PATTERN1 = "Caused by: ";
    private static final String PATTERN2 = "\tat ";
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion((EndpointVersion)MockedEndpointVersion.V1).build();
    private SessionHandle sessionHandle;

    @BeforeAll
    static void setup() throws Exception {
        restClient = TestingRestClient.getTestingRestClient();
    }

    @AfterAll
    static void cleanUp() throws Exception {
        restClient.shutdown();
    }

    @Parameters(name="parameters={0}")
    public static List<AbstractSqlGatewayStatementITCase.TestParameters> parameters() throws Exception {
        return SqlGatewayRestEndpointStatementITCase.listFlinkSqlTests().stream().flatMap(path -> Stream.of(new RestTestParameters((String)path, RowFormat.JSON), new RestTestParameters((String)path, RowFormat.PLAIN_TEXT))).collect(Collectors.toList());
    }

    @Override
    @BeforeEach
    public void before(@TempDir Path temporaryFolder) throws Exception {
        super.before(temporaryFolder);
        this.sessionHandle = service.openSession(this.defaultSessionEnvironment);
        sessionMessageParameters = new SessionMessageParameters(this.sessionHandle);
    }

    @Override
    protected String runSingleStatement(String statement) throws Exception {
        ExecuteStatementRequestBody executeStatementRequestBody = new ExecuteStatementRequestBody(statement, Long.valueOf(0L), new HashMap());
        CompletableFuture response = restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)executeStatementHeaders, (MessageParameters)sessionMessageParameters, (RequestBody)executeStatementRequestBody);
        ExecuteStatementResponseBody executeStatementResponseBody = (ExecuteStatementResponseBody)response.get();
        String operationHandleString = executeStatementResponseBody.getOperationHandle();
        org.junit.jupiter.api.Assertions.assertNotNull((Object)operationHandleString);
        OperationHandle operationHandle = new OperationHandle(UUID.fromString(operationHandleString));
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(this.sessionHandle).getOperationManager().getOperation(operationHandle));
        CommonTestUtils.waitUtil(() -> SQL_GATEWAY_SERVICE_EXTENSION.getService().getOperationInfo(this.sessionHandle, operationHandle).getStatus().isTerminalStatus(), (Duration)Duration.ofSeconds(100L), (String)"Failed to wait operation finish.");
        FetchResultsResponseBody fetchResultsResponseBody = this.fetchResults(this.sessionHandle, operationHandle, 0L);
        ResultInfo resultInfo = fetchResultsResponseBody.getResults();
        Assertions.assertThat((Object)resultInfo).isNotNull();
        ResultSet.ResultType resultType = fetchResultsResponseBody.getResultType();
        Assertions.assertThat(Arrays.asList(ResultSet.ResultType.PAYLOAD, ResultSet.ResultType.EOS)).contains((Object[])new ResultSet.ResultType[]{resultType});
        ResolvedSchema resultSchema = resultInfo.getResultSchema();
        return this.toString(AbstractSqlGatewayStatementITCase.StatementType.match(statement), resultSchema, (RowDataToStringConverter)(((RestTestParameters)this.parameters).getRowFormat() == RowFormat.JSON ? new RowDataToStringConverterImpl(resultSchema.toPhysicalRowDataType(), DateTimeUtils.UTC_ZONE.toZoneId(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader(), false, new CodeGeneratorContext((ReadableConfig)new Configuration(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader())) : StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER), new RowDataIterator(this.sessionHandle, operationHandle));
    }

    FetchResultsResponseBody fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, Long token) throws Exception {
        FetchResultsMessageParameters fetchResultsMessageParameters = new FetchResultsMessageParameters(sessionHandle, operationHandle, token, ((RestTestParameters)this.parameters).getRowFormat());
        CompletableFuture response = restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)fetchResultsHeaders, (MessageParameters)fetchResultsMessageParameters, (RequestBody)EmptyRequestBody.getInstance());
        return (FetchResultsResponseBody)response.get();
    }

    @Override
    protected String stringifyException(Throwable t) {
        if (StringUtils.isNullOrWhitespaceOnly((String)t.getMessage())) {
            return t.getClass().getCanonicalName();
        }
        String message = t.getMessage();
        String[] splitExceptions = message.split(PATTERN1);
        return splitExceptions[splitExceptions.length - 1].split(PATTERN2)[0];
    }

    @Override
    protected boolean isStreaming() {
        return ((RuntimeExecutionMode)Configuration.fromMap((Map)service.getSessionConfig(this.sessionHandle)).get(ExecutionOptions.RUNTIME_MODE)).equals((Object)RuntimeExecutionMode.STREAMING);
    }

    static {
        executeStatementHeaders = ExecuteStatementHeaders.getInstance();
        fetchResultsHeaders = FetchResultsHeaders.getDefaultInstance();
    }

    private class RowDataIterator
    implements Iterator<RowData> {
        private final SessionHandle sessionHandle;
        private final OperationHandle operationHandle;
        private Long token = 0L;
        private Iterator<RowData> fetchedRows = Collections.emptyIterator();

        public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
            this.sessionHandle = sessionHandle;
            this.operationHandle = operationHandle;
            this.fetch();
        }

        @Override
        public boolean hasNext() {
            while (this.token != null && !this.fetchedRows.hasNext()) {
                try {
                    this.fetch();
                }
                catch (Exception ignored) {
                    LOG.error("Failed to fetch results.", (Throwable)ignored);
                }
            }
            return this.fetchedRows.hasNext();
        }

        @Override
        public RowData next() {
            return this.fetchedRows.next();
        }

        private void fetch() throws Exception {
            FetchResultsResponseBody fetchResultsResponseBody = SqlGatewayRestEndpointStatementITCase.this.fetchResults(this.sessionHandle, this.operationHandle, this.token);
            this.token = SqlGatewayRestEndpointUtils.parseToken((String)fetchResultsResponseBody.getNextResultUri());
            this.fetchedRows = fetchResultsResponseBody.getResults().getData().iterator();
        }
    }

    private static class RestTestParameters
    extends AbstractSqlGatewayStatementITCase.TestParameters {
        private final RowFormat rowFormat;

        public RestTestParameters(String sqlPath, RowFormat rowFormat) {
            super(sqlPath);
            this.rowFormat = rowFormat;
        }

        public RowFormat getRowFormat() {
            return this.rowFormat;
        }

        @Override
        public String toString() {
            return "RestTestParameters{sqlPath='" + this.sqlPath + '\'' + ", rowFormat=" + this.rowFormat + '}';
        }
    }
}

