package io.cdap.plugin.gcp.bigquery.source;

import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("BigQueryTable")
@Description("This source reads the entire contents of a BigQuery table. BigQuery is Google's serverless, highly scalable, enterprise data warehouse.Data is first written to a temporary location on Google Cloud Storage, then read into the pipeline from there.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = BigQueryConnector.NAME)})
@Plugin(type = "batchsource")
/* loaded from: input_file:io/cdap/plugin/gcp/bigquery/source/BigQuerySource.class */
public final class BigQuerySource extends BatchSource<LongWritable, GenericData.Record, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class);
    private static final Gson GSON = new Gson();
    public static final String NAME = "BigQueryTable";
    private BigQuerySourceConfig config;
    private Schema outputSchema;
    private Configuration configuration;
    private final BigQueryAvroToStructuredTransformer transformer = new BigQueryAvroToStructuredTransformer();
    private String bucketPath;

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        this.config.validate(failureCollector);
        Schema schema = this.config.getSchema(failureCollector);
        if (!this.config.canConnect() || ((this.config.isServiceAccountFilePath().booleanValue() && this.config.autoServiceAccountUnavailable()) || (this.config.tryGetProject() == null && this.config.getDatasetProject() == null))) {
            stageConfigurer.setOutputSchema(schema);
            return;
        }
        Schema schema2 = getSchema(failureCollector);
        validatePartitionProperties(failureCollector);
        if (schema == null) {
            stageConfigurer.setOutputSchema(schema2);
        } else {
            validateConfiguredSchema(schema, failureCollector);
            stageConfigurer.setOutputSchema(schema);
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        this.config.validate(failureCollector, batchSourceContext.getArguments().asMap());
        if (getBQSchema(failureCollector).getFields().isEmpty()) {
            failureCollector.addFailure(String.format("BigQuery table %s.%s does not have a schema.", this.config.getDataset(), this.config.getTable()), "Please edit the table to add a schema.");
            failureCollector.getOrThrowException();
        }
        Schema outputSchema = getOutputSchema(failureCollector);
        String serviceAccount = this.config.getServiceAccount();
        Credentials credentials = BigQuerySourceUtils.getCredentials(this.config.getConnection());
        Dataset dataset = GCPUtils.getBigQuery(this.config.getProject(), credentials).getDataset(DatasetId.of(this.config.getDatasetProject(), this.config.getDataset()), new BigQuery.DatasetOption[0]);
        Storage storage = GCPUtils.getStorage(this.config.getProject(), credentials);
        this.bucketPath = UUID.randomUUID().toString();
        CryptoKeyName cmekKey = CmekUtils.getCmekKey(this.config.cmekKey, batchSourceContext.getArguments().asMap(), failureCollector);
        failureCollector.getOrThrowException();
        this.configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, this.config.getProject(), cmekKey, this.config.getServiceAccountType());
        String orCreateBucket = BigQuerySourceUtils.getOrCreateBucket(this.configuration, storage, BigQueryUtil.getStagingBucketName(batchSourceContext.getArguments().asMap(), null, dataset, this.config.getBucket()), dataset, this.bucketPath, cmekKey);
        BigQuerySourceUtils.configureServiceAccount(this.configuration, this.config.getConnection());
        configureBigQuerySource();
        BigQuerySourceUtils.configureBigQueryInput(this.configuration, DatasetId.of(this.config.getDatasetProject(), this.config.getDataset()), this.config.getTable(), BigQuerySourceUtils.getTemporaryGcsPath(orCreateBucket, this.bucketPath, this.bucketPath));
        emitLineage(batchSourceContext, outputSchema, this.config.getSourceTableType(), this.config.getTable(), Asset.builder(this.config.getReferenceName()).setFqn(BigQueryUtil.getFQN(this.config.getDatasetProject(), this.config.getDataset(), this.config.getTable())).setLocation(dataset.getLocation()).build());
        setInputFormat(batchSourceContext, outputSchema);
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.outputSchema = this.config.getSchema(batchRuntimeContext.getFailureCollector());
    }

    public void transform(KeyValue<LongWritable, GenericData.Record> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(this.outputSchema == null ? this.transformer.transform((GenericRecord) keyValue.getValue()) : this.transformer.transform((GenericRecord) keyValue.getValue(), this.outputSchema));
    }

    public void onRunFinish(boolean z, BatchSourceContext batchSourceContext) {
        BigQuerySourceUtils.deleteGcsTemporaryDirectory(this.configuration, this.config.getBucket(), this.bucketPath);
        BigQuerySourceUtils.deleteBigQueryTemporaryTable(this.configuration, this.config);
    }

    private void configureBigQuerySource() {
        if (this.config.getPartitionFrom() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, this.config.getPartitionFrom());
        }
        if (this.config.getPartitionTo() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_PARTITION_TO_DATE, this.config.getPartitionTo());
        }
        if (this.config.getFilter() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_FILTER, this.config.getFilter());
        }
        if (this.config.getViewMaterializationProject() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_PROJECT, this.config.getViewMaterializationProject());
        }
        if (this.config.getViewMaterializationDataset() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, this.config.getViewMaterializationDataset());
        }
    }

    public Schema getSchema(FailureCollector failureCollector) {
        return BigQueryUtil.getTableSchema(getBQSchema(failureCollector), failureCollector);
    }

    private void validateConfiguredSchema(Schema schema, FailureCollector failureCollector) {
        String dataset = this.config.getDataset();
        String table = this.config.getTable();
        String datasetProject = this.config.getDatasetProject();
        FieldList fields = getBQSchema(failureCollector).getFields();
        for (Schema.Field field : schema.getFields()) {
            try {
                ValidationFailure validateFieldSchemaMatches = BigQueryUtil.validateFieldSchemaMatches(fields.get(field.getName()), field, dataset, table, BigQuerySourceConfig.SUPPORTED_TYPES, failureCollector);
                if (validateFieldSchemaMatches != null) {
                    validateFieldSchemaMatches.withOutputSchemaField(field.getName());
                }
            } catch (IllegalArgumentException e) {
                failureCollector.addFailure(String.format("Field '%s' is not present in table '%s:%s.%s'.", field.getName(), datasetProject, dataset, table), String.format("Remove field '%s' from the output schema.", field.getName())).withOutputSchemaField(field.getName());
            }
        }
        failureCollector.getOrThrowException();
    }

    private com.google.cloud.bigquery.Schema getBQSchema(FailureCollector failureCollector) {
        String serviceAccount = this.config.getServiceAccount();
        String dataset = this.config.getDataset();
        String table = this.config.getTable();
        String datasetProject = this.config.getDatasetProject();
        Table bigQueryTable = BigQueryUtil.getBigQueryTable(datasetProject, dataset, table, serviceAccount, this.config.isServiceAccountFilePath(), failureCollector);
        if (bigQueryTable == null) {
            failureCollector.addFailure(String.format("BigQuery table '%s:%s.%s' does not exist.", datasetProject, dataset, table), "Ensure correct table name is provided.").withConfigProperty("table");
            throw failureCollector.getOrThrowException();
        }
        com.google.cloud.bigquery.Schema schema = bigQueryTable.getDefinition().getSchema();
        if (schema != null) {
            return schema;
        }
        failureCollector.addFailure(String.format("Cannot read from table '%s:%s.%s' because it has no schema.", datasetProject, dataset, bigQueryTable), "Alter the table to have a schema.").withConfigProperty("table");
        throw failureCollector.getOrThrowException();
    }

    @Nullable
    private Schema getOutputSchema(FailureCollector failureCollector) {
        Schema schema = this.config.getSchema(failureCollector);
        Schema schema2 = schema == null ? getSchema(failureCollector) : schema;
        validatePartitionProperties(failureCollector);
        validateConfiguredSchema(schema2, failureCollector);
        return schema2;
    }

    private void validatePartitionProperties(FailureCollector failureCollector) {
        Table bigQueryTable = BigQueryUtil.getBigQueryTable(this.config.getDatasetProject(), this.config.getDataset(), this.config.getTable(), this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), failureCollector);
        if (bigQueryTable == null) {
            return;
        }
        if ((bigQueryTable.getDefinition() instanceof StandardTableDefinition) && ((StandardTableDefinition) bigQueryTable.getDefinition()).getTimePartitioning() == null) {
            return;
        }
        String partitionFrom = this.config.getPartitionFrom();
        String partitionTo = this.config.getPartitionTo();
        if (partitionFrom == null && partitionTo == null) {
            return;
        }
        LocalDate localDate = null;
        if (partitionFrom != null) {
            try {
                localDate = LocalDate.parse(partitionFrom);
            } catch (DateTimeException e) {
                failureCollector.addFailure("Invalid partition from date format.", "Ensure partition from date is of format 'yyyy-MM-dd'.").withConfigProperty(BigQuerySourceConfig.NAME_PARTITION_FROM);
            }
        }
        LocalDate localDate2 = null;
        if (partitionTo != null) {
            try {
                localDate2 = LocalDate.parse(partitionTo);
            } catch (DateTimeException e2) {
                failureCollector.addFailure("Invalid partition to date format.", "Ensure partition to date is of format 'yyyy-MM-dd'.").withConfigProperty(BigQuerySourceConfig.NAME_PARTITION_TO);
            }
        }
        if (localDate == null || localDate2 == null || !localDate.isAfter(localDate2) || localDate.isEqual(localDate2)) {
            return;
        }
        failureCollector.addFailure("'Partition From Date' must be before or equal 'Partition To Date'.", (String) null).withConfigProperty(BigQuerySourceConfig.NAME_PARTITION_FROM).withConfigProperty(BigQuerySourceConfig.NAME_PARTITION_TO);
    }

    private void setInputFormat(BatchSourceContext batchSourceContext, Schema schema) {
        batchSourceContext.setInput(Input.of(this.config.getReferenceName(), new BigQueryInputFormatProvider(this.configuration)));
        ImmutableMap.Builder builder = new ImmutableMap.Builder();
        if (schema == null) {
            LOG.debug("BigQuery SQL Engine Input was not initialized. Schema was empty.");
            return;
        }
        builder.put("config", GSON.toJson(this.config)).put("schema", GSON.toJson(schema)).put("fields", GSON.toJson((List) schema.getFields().stream().map(field -> {
            return field.getName();
        }).collect(Collectors.toList())));
        batchSourceContext.setInput(new SQLEngineInput(this.config.referenceName, batchSourceContext.getStageName(), BigQuerySQLEngine.class.getName(), builder.build()));
    }

    private void emitLineage(BatchSourceContext batchSourceContext, Schema schema, TableDefinition.Type type, String str, Asset asset) {
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSourceContext, asset);
        lineageRecorder.createExternalDataset(schema);
        Object obj = "table";
        if (TableDefinition.Type.VIEW == type) {
            obj = "view";
        } else if (TableDefinition.Type.MATERIALIZED_VIEW == type) {
            obj = "materialized view";
        }
        if (schema.getFields() != null) {
            lineageRecorder.recordRead("Read", String.format("Read from BigQuery %s '%s'.", obj, str), (List) schema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, GenericData.Record>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
