package org.apache.druid.testing.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.sql.SqlTaskStatus;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.utils.QueryResultVerifier;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;

/* loaded from: input_file:org/apache/druid/testing/utils/MsqTestQueryHelper.class */
public class MsqTestQueryHelper extends AbstractTestQueryHelper<MsqQueryWithResults> {
    private final ObjectMapper jsonMapper;
    private final IntegrationTestingConfig config;
    private final OverlordResourceTestClient overlordClient;
    private final SqlResourceTestClient msqClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/testing/utils/MsqTestQueryHelper$TaskStillRunningException.class */
    public static class TaskStillRunningException extends Exception {
        private TaskStillRunningException() {
        }
    }

    @Inject
    MsqTestQueryHelper(ObjectMapper objectMapper, SqlResourceTestClient sqlResourceTestClient, IntegrationTestingConfig integrationTestingConfig, OverlordResourceTestClient overlordResourceTestClient, SqlResourceTestClient sqlResourceTestClient2) {
        super(objectMapper, sqlResourceTestClient, integrationTestingConfig);
        this.jsonMapper = objectMapper;
        this.config = integrationTestingConfig;
        this.overlordClient = overlordResourceTestClient;
        this.msqClient = sqlResourceTestClient2;
        this.jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
    }

    @Override // org.apache.druid.testing.utils.AbstractTestQueryHelper
    public String getQueryURL(String str) {
        return StringUtils.format("%s/druid/v2/sql/task", new Object[]{str});
    }

    public SqlTaskStatus submitMsqTaskSuccesfully(String str) throws ExecutionException, InterruptedException {
        return submitMsqTaskSuccesfully(str, ImmutableMap.of());
    }

    public SqlTaskStatus submitMsqTaskSuccesfully(String str, Map<String, Object> map) throws ExecutionException, InterruptedException {
        return submitMsqTaskSuccesfully(new SqlQuery(str, (ResultFormat) null, false, false, false, map, (List) null), null, null);
    }

    public SqlTaskStatus submitMsqTaskSuccesfully(SqlQuery sqlQuery) throws ExecutionException, InterruptedException {
        return submitMsqTaskSuccesfully(sqlQuery, null, null);
    }

    public SqlTaskStatus submitMsqTaskSuccesfully(SqlQuery sqlQuery, String str, String str2) throws ExecutionException, InterruptedException {
        StatusResponseHolder submitMsqTask = submitMsqTask(sqlQuery, str, str2);
        HttpResponseStatus status = submitMsqTask.getStatus();
        if (!status.equals(HttpResponseStatus.ACCEPTED)) {
            throw new ISE(StringUtils.format("Unable to submit the task successfully. Received response status code [%d], and response content:\n[%s]", new Object[]{Integer.valueOf(status.getCode()), submitMsqTask.getContent()}), new Object[0]);
        }
        try {
            return (SqlTaskStatus) this.jsonMapper.readValue(submitMsqTask.getContent(), SqlTaskStatus.class);
        } catch (JsonProcessingException e) {
            throw new ISE("Unable to parse the response", new Object[0]);
        }
    }

    public StatusResponseHolder submitMsqTask(SqlQuery sqlQuery, String str, String str2) throws ExecutionException, InterruptedException {
        try {
            return this.msqClient.queryAsync(getQueryURL(this.config.getBrokerUrl()), sqlQuery, str, str2).get(5L, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            throw new ISE(e, "Unable to fetch the task id for the submitted task in time.", new Object[0]);
        }
    }

    public TaskState pollTaskIdForCompletion(String str) throws Exception {
        return (TaskState) RetryUtils.retry(() -> {
            TaskStatusPlus taskStatus = this.overlordClient.getTaskStatus(str);
            TaskState statusCode = taskStatus.getStatusCode();
            if (statusCode == null || !statusCode.isComplete()) {
                throw new TaskStillRunningException();
            }
            return taskStatus.getStatusCode();
        }, th -> {
            return th instanceof TaskStillRunningException;
        }, 99, 100);
    }

    public void pollTaskIdForSuccess(String str) throws Exception {
        Assert.assertEquals(pollTaskIdForCompletion(str), TaskState.SUCCESS);
    }

    public TaskReport.ReportMap fetchStatusReports(String str) {
        return this.overlordClient.getTaskReport(str);
    }

    private void compareResults(String str, MsqQueryWithResults msqQueryWithResults) {
        MSQTaskReport mSQTaskReport = (MSQTaskReport) fetchStatusReports(str).get("multiStageQuery");
        if (mSQTaskReport == null) {
            throw new ISE("Unable to fetch the status report for the task [%]", new Object[]{str});
        }
        MSQResultsReport mSQResultsReport = (MSQResultsReport) Preconditions.checkNotNull(((MSQTaskReportPayload) Preconditions.checkNotNull(mSQTaskReport.getPayload(), "payload")).getResults(), "Results report for the task id is empty");
        ArrayList arrayList = new ArrayList();
        List signature = mSQResultsReport.getSignature();
        for (Yielder resultYielder = mSQResultsReport.getResultYielder(); !resultYielder.isDone(); resultYielder = resultYielder.next((Object) null)) {
            Object[] objArr = (Object[]) resultYielder.get();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (int i = 0; i < objArr.length; i++) {
                linkedHashMap.put(((MSQResultsReport.ColumnAndType) signature.get(i)).getName(), objArr[i]);
            }
            arrayList.add(linkedHashMap);
        }
        QueryResultVerifier.ResultVerificationObject compareResults = QueryResultVerifier.compareResults(arrayList, msqQueryWithResults.getExpectedResults(), Collections.emptyList());
        if (!compareResults.isSuccess()) {
            throw new IAE("Expected query result is different from the actual result.\nQuery: %s\nActual Result: %s\nExpected Result: %s\nMismatch Error: %s\n", new Object[]{msqQueryWithResults.getQuery(), arrayList, msqQueryWithResults.getExpectedResults(), compareResults.getErrorMessage()});
        }
    }

    @Override // org.apache.druid.testing.utils.AbstractTestQueryHelper
    public void testQueriesFromFile(String str, String str2) throws Exception {
        LOG.info("Starting query tests for [%s]", new Object[]{str});
        for (MsqQueryWithResults msqQueryWithResults : (List) this.jsonMapper.readValue(TestQueryHelper.class.getResourceAsStream(str), new TypeReference<List<MsqQueryWithResults>>() { // from class: org.apache.druid.testing.utils.MsqTestQueryHelper.1
        })) {
            SqlTaskStatus submitMsqTaskSuccesfully = submitMsqTaskSuccesfully(StringUtils.replace(msqQueryWithResults.getQuery(), "%%DATASOURCE%%", str2));
            if (submitMsqTaskSuccesfully.getState().isFailure()) {
                throw new ISE("Unable to start the task successfully.\nPossible exception: %s", new Object[]{submitMsqTaskSuccesfully.getError()});
            }
            String taskId = submitMsqTaskSuccesfully.getTaskId();
            pollTaskIdForSuccess(taskId);
            compareResults(taskId, msqQueryWithResults);
        }
    }

    public void submitMsqTaskAndWaitForCompletion(String str, Map<String, Object> map) throws Exception {
        SqlTaskStatus submitMsqTaskSuccesfully = submitMsqTaskSuccesfully(str, map);
        LOG.info("Sql Task submitted with task Id - %s", new Object[]{submitMsqTaskSuccesfully.getTaskId()});
        if (submitMsqTaskSuccesfully.getState().isFailure()) {
            Assert.fail(StringUtils.format("Unable to start the task successfully.\nPossible exception: %s", new Object[]{submitMsqTaskSuccesfully.getError()}));
        }
        pollTaskIdForCompletion(submitMsqTaskSuccesfully.getTaskId());
    }
}
