package io.cdap.plugin.gcp.bigquery.sqlengine;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.SQLEngineContext;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineException;
import io.cdap.cdap.etl.api.engine.sql.capability.DefaultPullCapability;
import io.cdap.cdap.etl.api.engine.sql.capability.PullCapability;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDatasetProducer;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLPullDataset;
import io.cdap.cdap.etl.api.engine.sql.dataset.SQLPushDataset;
import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPullRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPushRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult;
import io.cdap.cdap.etl.api.engine.sql.request.SQLRelationDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteResult;
import io.cdap.cdap.etl.api.join.JoinCondition;
import io.cdap.cdap.etl.api.join.JoinDefinition;
import io.cdap.cdap.etl.api.join.JoinStage;
import io.cdap.cdap.etl.api.relational.Capability;
import io.cdap.cdap.etl.api.relational.Engine;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.relational.BigQueryRelation;
import io.cdap.plugin.gcp.bigquery.relational.SQLExpressionFactory;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils;
import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQueryJoinSQLBuilder;
import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name(BigQuerySQLEngine.NAME)
@Description("BigQuery SQLEngine implementation, used to push down certain pipeline steps into BigQuery. A GCS bucket is used as staging for the read/write operations performed by this engine. BigQuery is Google's serverless, highly scalable, enterprise data warehouse.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = BigQueryConnector.NAME)})
@Plugin(type = "sqlengine")
/* loaded from: input_file:io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.class */
public class BigQuerySQLEngine extends BatchSQLEngine<LongWritable, GenericData.Record, StructuredRecord, NullWritable> implements Engine {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngine.class);
    public static final String NAME = "BigQueryPushdownEngine";
    private final BigQuerySQLEngineConfig sqlEngineConfig;
    private SQLEngineContext ctx;
    private BigQuery bigQuery;
    private Storage storage;
    private Configuration configuration;
    private String project;
    private String datasetProject;
    private String datasetName;
    private String bucket;
    private String runId;
    private Map<String, BigQuerySQLDataset> datasets;
    private Metrics metrics;

    public BigQuerySQLEngine(BigQuerySQLEngineConfig bigQuerySQLEngineConfig) {
        this.sqlEngineConfig = bigQuerySQLEngineConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.sqlEngineConfig.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
    }

    public void prepareRun(SQLEngineContext sQLEngineContext) throws Exception {
        super.prepareRun(sQLEngineContext);
        this.ctx = sQLEngineContext;
        this.sqlEngineConfig.validate();
        this.runId = BigQuerySQLEngineUtils.newIdentifier();
        this.datasets = new HashMap();
        String serviceAccount = this.sqlEngineConfig.getServiceAccount();
        GoogleCredentials loadServiceAccountCredentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, this.sqlEngineConfig.isServiceAccountFilePath().booleanValue());
        this.project = this.sqlEngineConfig.getProject();
        this.datasetProject = this.sqlEngineConfig.getDatasetProject();
        this.datasetName = this.sqlEngineConfig.getDataset();
        this.bigQuery = GCPUtils.getBigQuery(this.project, loadServiceAccountCredentials);
        this.storage = GCPUtils.getStorage(this.project, loadServiceAccountCredentials);
        this.bucket = BigQueryUtil.getStagingBucketName(sQLEngineContext.getRuntimeArguments(), this.sqlEngineConfig.getLocation(), this.bigQuery.getDataset(DatasetId.of(this.datasetProject, this.datasetName), new BigQuery.DatasetOption[0]), this.sqlEngineConfig.getBucket());
        String str = !Strings.isNullOrEmpty(this.sqlEngineConfig.cmekKey) ? this.sqlEngineConfig.cmekKey : (String) this.ctx.getRuntimeArguments().get(CmekUtils.CMEK_KEY);
        CryptoKeyName cryptoKeyName = null;
        if (!Strings.isNullOrEmpty(str)) {
            cryptoKeyName = CryptoKeyName.parse(str);
        }
        this.configuration = BigQueryUtil.getBigQueryConfig(this.sqlEngineConfig.getServiceAccount(), this.sqlEngineConfig.getProject(), cryptoKeyName, this.sqlEngineConfig.getServiceAccountType());
        this.bucket = BigQuerySinkUtils.configureBucket(this.configuration, this.bucket, "bqpushdown-" + this.runId);
        BigQuerySinkUtils.createResources(this.bigQuery, this.storage, DatasetId.of(this.datasetProject, this.datasetName), this.bucket, this.sqlEngineConfig.getLocation(), cryptoKeyName);
        BigQuerySourceUtils.configureServiceAccount(this.configuration, this.sqlEngineConfig.connection);
        this.metrics = this.ctx.getMetrics();
    }

    public void onRunFinish(boolean z, SQLEngineContext sQLEngineContext) {
        super.onRunFinish(z, sQLEngineContext);
        String format = this.sqlEngineConfig.getBucket() == null ? String.format(BigQuerySourceUtils.GCS_BUCKET_FORMAT, this.bucket) : String.format("gs://%s/%s", this.bucket, this.runId);
        try {
            BigQueryUtil.deleteTemporaryDirectory(this.configuration, format);
        } catch (IOException e) {
            LOG.warn("Failed to delete temporary directory '{}': {}", format, e.getMessage());
        }
    }

    public SQLPushDataset<StructuredRecord, StructuredRecord, NullWritable> getPushProvider(SQLPushRequest sQLPushRequest) throws SQLEngineException {
        try {
            BigQueryPushDataset bigQueryPushDataset = BigQueryPushDataset.getInstance(sQLPushRequest, this.sqlEngineConfig, this.configuration, this.bigQuery, DatasetId.of(this.datasetProject, this.datasetName), this.bucket, this.runId);
            LOG.info("Executing Push operation for dataset {} stored in table {}", sQLPushRequest.getDatasetName(), bigQueryPushDataset.getBigQueryTable());
            this.datasets.put(sQLPushRequest.getDatasetName(), bigQueryPushDataset);
            return bigQueryPushDataset;
        } catch (IOException e) {
            throw new SQLEngineException(e);
        }
    }

    public SQLPullDataset<StructuredRecord, LongWritable, GenericData.Record> getPullProvider(SQLPullRequest sQLPullRequest) throws SQLEngineException {
        if (!this.datasets.containsKey(sQLPullRequest.getDatasetName())) {
            throw new SQLEngineException(String.format("Trying to pull non-existing dataset: '%s", sQLPullRequest.getDatasetName()));
        }
        String bigQueryTable = this.datasets.get(sQLPullRequest.getDatasetName()).getBigQueryTable();
        LOG.info("Executing Pull operation for dataset {} stored in table {}", sQLPullRequest.getDatasetName(), bigQueryTable);
        try {
            return BigQueryPullDataset.getInstance(sQLPullRequest, this.configuration, this.bigQuery, DatasetId.of(this.datasetProject, this.datasetName), bigQueryTable, this.bucket, this.runId);
        } catch (IOException e) {
            throw new SQLEngineException(e);
        }
    }

    public boolean exists(String str) throws SQLEngineException {
        return this.datasets.containsKey(str);
    }

    public boolean canJoin(SQLJoinDefinition sQLJoinDefinition) {
        boolean isValidJoinDefinition = isValidJoinDefinition(sQLJoinDefinition);
        LOG.info("Validating join for stage '{}' can be executed on BigQuery: {}", sQLJoinDefinition.getDatasetName(), Boolean.valueOf(isValidJoinDefinition));
        return isValidJoinDefinition;
    }

    @VisibleForTesting
    protected static boolean isValidJoinDefinition(SQLJoinDefinition sQLJoinDefinition) {
        ArrayList arrayList = new ArrayList();
        JoinDefinition joinDefinition = sQLJoinDefinition.getJoinDefinition();
        Iterator it = joinDefinition.getStages().iterator();
        while (it.hasNext()) {
            BigQuerySQLEngineUtils.validateInputStage((JoinStage) it.next(), arrayList);
        }
        BigQuerySQLEngineUtils.validateOutputSchema(joinDefinition.getOutputSchema(), arrayList);
        if (joinDefinition.getCondition().getOp() == JoinCondition.Op.EXPRESSION) {
            BigQuerySQLEngineUtils.validateOnExpressionJoinCondition(joinDefinition.getCondition(), arrayList);
        }
        if (joinDefinition.getCondition().getOp() == JoinCondition.Op.KEY_EQUALITY) {
            BigQuerySQLEngineUtils.validateJoinOnKeyStages(joinDefinition, arrayList);
        }
        if (!arrayList.isEmpty()) {
            LOG.warn("Join operation for stage '{}' could not be executed in BigQuery. Issues found: {}.", sQLJoinDefinition.getDatasetName(), String.join("; ", arrayList));
        }
        return arrayList.isEmpty();
    }

    public SQLDataset join(SQLJoinRequest sQLJoinRequest) throws SQLEngineException {
        return executeSelect(sQLJoinRequest.getDatasetName(), sQLJoinRequest.getJoinDefinition().getOutputSchema(), BigQueryJobType.JOIN, new BigQueryJoinSQLBuilder(sQLJoinRequest.getJoinDefinition(), DatasetId.of(this.datasetProject, this.datasetName), getStageNameToBQTableNameMap()).getQuery());
    }

    @Nullable
    public SQLDatasetProducer getProducer(SQLPullRequest sQLPullRequest, PullCapability pullCapability) {
        if (!this.sqlEngineConfig.shouldUseStorageReadAPI().booleanValue() || pullCapability != DefaultPullCapability.SPARK_RDD_PULL) {
            return null;
        }
        return new BigQuerySparkDatasetProducer(this.sqlEngineConfig, this.datasetProject, this.datasetName, this.datasets.get(sQLPullRequest.getDatasetName()).getBigQueryTable(), sQLPullRequest.getDatasetSchema());
    }

    public Set<PullCapability> getPullCapabilities() {
        return !this.sqlEngineConfig.shouldUseStorageReadAPI().booleanValue() ? Collections.emptySet() : Collections.singleton(DefaultPullCapability.SPARK_RDD_PULL);
    }

    public SQLReadResult read(SQLReadRequest sQLReadRequest) throws SQLEngineException {
        String datasetName = sQLReadRequest.getDatasetName();
        if (!BigQuerySQLEngine.class.getName().equals(sQLReadRequest.getInput().getSqlEngineClassName())) {
            LOG.debug("Got output for another SQL engine {}, skipping", sQLReadRequest.getInput().getSqlEngineClassName());
            return SQLReadResult.unsupported(datasetName);
        }
        BigQueryReadDataset bigQueryReadDataset = BigQueryReadDataset.getInstance(datasetName, this.sqlEngineConfig, this.bigQuery, sQLReadRequest, TableId.of(this.datasetProject, this.datasetName, BigQuerySQLEngineUtils.getNewTableName(this.runId)), this.metrics);
        SQLReadResult read = bigQueryReadDataset.read();
        if (read.isSuccessful()) {
            this.datasets.put(datasetName, bigQueryReadDataset);
        }
        return read;
    }

    public SQLWriteResult write(SQLWriteRequest sQLWriteRequest) {
        String datasetName = sQLWriteRequest.getDatasetName();
        if (!BigQuerySQLEngine.class.getName().equals(sQLWriteRequest.getOutput().getSqlEngineClassName())) {
            LOG.debug("Got output for another SQL engine {}, skipping", sQLWriteRequest.getOutput().getSqlEngineClassName());
            return SQLWriteResult.unsupported(datasetName);
        }
        return BigQueryWrite.getInstance(datasetName, this.sqlEngineConfig, this.bigQuery, sQLWriteRequest, TableId.of(this.datasetProject, this.datasetName, this.datasets.get(sQLWriteRequest.getDatasetName()).getBigQueryTable()), this.metrics).write();
    }

    public void cleanup(String str) throws SQLEngineException {
        BigQuerySQLDataset bigQuerySQLDataset = this.datasets.get(str);
        if (bigQuerySQLDataset == null) {
            return;
        }
        LOG.info("Cleaning up dataset {}", str);
        Throwable th = null;
        try {
            cancelJob(str, bigQuerySQLDataset);
        } catch (BigQueryException e) {
            LOG.error("Exception when cancelling BigQuery job '{}' for stage '{}': {}", new Object[]{bigQuerySQLDataset.getJobId(), str, e.getMessage()});
            th = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", str), e);
        }
        try {
            deleteTable(str, bigQuerySQLDataset);
        } catch (BigQueryException e2) {
            LOG.error("Exception when deleting BigQuery table '{}' for stage '{}': {}", new Object[]{bigQuerySQLDataset.getBigQueryTable(), str, e2.getMessage()});
            if (th == null) {
                th = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", str), e2);
            } else {
                th.addSuppressed(e2);
            }
        }
        try {
            deleteTempFolder(bigQuerySQLDataset);
        } catch (IOException e3) {
            LOG.error("Failed to delete temporary directory '{}' for stage '{}': {}", new Object[]{bigQuerySQLDataset.getGCSPath(), str, e3.getMessage()});
            if (th == null) {
                th = new SQLEngineException(String.format("Exception when executing cleanup for stage '%s'", str), e3);
            } else {
                th.addSuppressed(e3);
            }
        }
        if (th != null) {
            throw th;
        }
    }

    public Set<Capability> getCapabilities() {
        return Collections.singleton(StringExpressionFactoryType.SQL);
    }

    public List<ExpressionFactory<?>> getExpressionFactories() {
        return Collections.singletonList(new SQLExpressionFactory());
    }

    public Relation getRelation(SQLRelationDefinition sQLRelationDefinition) {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        List fields = sQLRelationDefinition.getSchema().getFields();
        if (fields != null) {
            Iterator it = fields.iterator();
            while (it.hasNext()) {
                linkedHashSet.add(((Schema.Field) it.next()).getName());
            }
        }
        return BigQueryRelation.getInstance(sQLRelationDefinition.getDatasetName(), linkedHashSet, this.ctx);
    }

    public Engine getRelationalEngine() {
        return this;
    }

    public boolean supportsRelationalTranform() {
        return true;
    }

    public boolean supportsInputSchema(Schema schema) {
        return BigQuerySQLEngineUtils.isSupportedSchema(schema);
    }

    public boolean supportsOutputSchema(Schema schema) {
        return BigQuerySQLEngineUtils.isSupportedSchema(schema);
    }

    public Set<String> getIncludedStageNames() {
        return this.sqlEngineConfig.getIncludedStages();
    }

    public Set<String> getExcludedStageNames() {
        return this.sqlEngineConfig.getExcludedStages();
    }

    public boolean canTransform(SQLTransformDefinition sQLTransformDefinition) {
        Relation outputRelation = sQLTransformDefinition.getOutputRelation();
        return (outputRelation instanceof BigQueryRelation) && outputRelation.isValid();
    }

    public SQLDataset transform(SQLTransformRequest sQLTransformRequest) throws SQLEngineException {
        BigQueryRelation bigQueryRelation = (BigQueryRelation) sQLTransformRequest.getOutputRelation();
        bigQueryRelation.setInputDatasets((Map) sQLTransformRequest.getInputDataSets().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (BigQuerySQLDataset) entry.getValue();
        })));
        return executeSelect(sQLTransformRequest.getOutputDatasetName(), sQLTransformRequest.getOutputSchema(), BigQueryJobType.TRANSFORM, bigQueryRelation.getSQLStatement());
    }

    private BigQuerySelectDataset executeSelect(String str, Schema schema, BigQueryJobType bigQueryJobType, String str2) {
        LOG.info("Executing {} operation for dataset {}", bigQueryJobType.getType(), str);
        String newIdentifier = BigQuerySQLEngineUtils.newIdentifier();
        String newTableName = BigQuerySQLEngineUtils.getNewTableName(this.runId);
        BigQuerySQLEngineUtils.createEmptyTable(this.sqlEngineConfig, this.bigQuery, this.project, this.datasetName, newTableName);
        BigQuerySelectDataset execute = BigQuerySelectDataset.getInstance(str, schema, this.sqlEngineConfig, this.bigQuery, this.project, DatasetId.of(this.datasetProject, this.datasetName), newTableName, newIdentifier, bigQueryJobType, str2, this.metrics).execute();
        this.datasets.put(str, execute);
        LOG.info("Executed {} operation for dataset {}", bigQueryJobType.getType(), str);
        return execute;
    }

    protected Map<String, String> getStageNameToBQTableNameMap() {
        return (Map) this.datasets.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((BigQuerySQLDataset) entry.getValue()).getBigQueryTable();
        }));
    }

    protected void cancelJob(String str, BigQuerySQLDataset bigQuerySQLDataset) throws BigQueryException {
        String jobId;
        if (this.sqlEngineConfig.shouldRetainTables().booleanValue() || (jobId = bigQuerySQLDataset.getJobId()) == null) {
            return;
        }
        String bigQueryTable = bigQuerySQLDataset.getBigQueryTable();
        Job job = this.bigQuery.getJob(jobId, new BigQuery.JobOption[0]);
        if (job == null || job.cancel()) {
            return;
        }
        LOG.error("Unable to cancel BigQuery job '{}' for table '{}' and stage '{}'", new Object[]{jobId, bigQueryTable, str});
    }

    protected void deleteTable(String str, BigQuerySQLDataset bigQuerySQLDataset) throws BigQueryException {
        if (this.sqlEngineConfig.shouldRetainTables().booleanValue()) {
            return;
        }
        TableId of = TableId.of(this.datasetProject, this.datasetName, bigQuerySQLDataset.getBigQueryTable());
        if (this.bigQuery.getTable(of, new BigQuery.TableOption[0]) != null) {
            this.bigQuery.delete(of);
        }
    }

    protected void deleteTempFolder(BigQuerySQLDataset bigQuerySQLDataset) throws IOException {
        String gCSPath = bigQuerySQLDataset.getGCSPath();
        if (gCSPath == null) {
            return;
        }
        BigQueryUtil.deleteTemporaryDirectory(this.configuration, gCSPath);
    }
}
