package io.cdap.plugin.gcp.dataplex.source;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.dataplex.v1.DataplexServiceClient;
import com.google.cloud.dataplex.v1.Entity;
import com.google.cloud.dataplex.v1.EntityName;
import com.google.cloud.dataplex.v1.LakeName;
import com.google.cloud.dataplex.v1.MetadataServiceClient;
import com.google.cloud.dataplex.v1.StorageSystem;
import com.google.cloud.dataplex.v1.Task;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Input;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.validation.ValidatingInputFormat;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.JobUtils;
import io.cdap.plugin.gcp.bigquery.source.BigQueryAvroToStructuredTransformer;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.dataplex.common.util.DataplexConstants;
import io.cdap.plugin.gcp.dataplex.common.util.DataplexUtil;
import io.cdap.plugin.gcp.dataplex.source.config.DataplexBatchSourceConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Dataplex")
@Description("Dataplex Source")
@Plugin(type = "batchsource")
/* loaded from: input_file:io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.class */
public class DataplexBatchSource extends BatchSource<Object, Object, StructuredRecord> {
    public static final String NAME = "Dataplex";
    private static final String BQ_TEMP_BUCKET_NAME_PREFIX = "dataplex-bq-source-bucket-";
    private static final String BQ_TEMP_BUCKET_NAME_TEMPLATE = "dataplex-bq-source-bucket-%s";
    private static final String CONFIG_TEMPORARY_TABLE_NAME = "cdap.bq.source.temporary.table.name";
    private static final String DATAPLEX_TASK_ARGS = "TASK_ARGS";
    private static Entity entity;
    private static String dataset;
    private static String datasetProject;
    private static Schema outputSchema;
    private static String tableId;
    private final BigQueryAvroToStructuredTransformer transformer = new BigQueryAvroToStructuredTransformer();
    private final DataplexBatchSourceConfig config;
    private Configuration configuration;
    private String bucketPath;
    private static final String GCS_TEMP_BUCKET_NAME = "dataplex-cdf-" + UUID.randomUUID();
    private static final Logger LOG = LoggerFactory.getLogger(DataplexBatchSource.class);

    public DataplexBatchSource(DataplexBatchSourceConfig dataplexBatchSourceConfig) {
        this.config = dataplexBatchSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        if (!this.config.getConnection().canConnect() || this.config.getServiceAccountType() == null || ((this.config.isServiceAccountFilePath().booleanValue() && this.config.autoServiceAccountUnavailable()) || this.config.tryGetProject() == null)) {
            this.config.setupValidatingInputFormat(pipelineConfigurer, failureCollector, null);
            return;
        }
        GoogleCredentials validateAndGetServiceAccountCredentials = this.config.validateAndGetServiceAccountCredentials(failureCollector);
        failureCollector.getOrThrowException();
        try {
            entity = this.config.getAndValidateEntityConfiguration(failureCollector, validateAndGetServiceAccountCredentials);
            if (entity == null) {
                this.config.setupValidatingInputFormat(pipelineConfigurer, failureCollector, null);
                return;
            }
            if (!entity.getSystem().equals(StorageSystem.BIGQUERY)) {
                this.config.checkMetastoreForGCSEntity(failureCollector, validateAndGetServiceAccountCredentials);
                this.config.setupValidatingInputFormat(pipelineConfigurer, failureCollector, entity);
                return;
            }
            getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
            this.config.validateBigQueryDataset(failureCollector, datasetProject, dataset, tableId);
            if (this.config.getSchema(failureCollector) == null) {
                stageConfigurer.setOutputSchema(DataplexUtil.getTableSchema(entity.getSchema(), failureCollector));
            }
        } catch (IOException e) {
            failureCollector.addFailure(e.getCause().getMessage(), "Please check credentials");
        }
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        GoogleCredentials validateAndGetServiceAccountCredentials = this.config.validateAndGetServiceAccountCredentials(failureCollector);
        failureCollector.getOrThrowException();
        entity = this.config.getAndValidateEntityConfiguration(failureCollector, validateAndGetServiceAccountCredentials);
        if (entity == null) {
            throw new IOException(String.format("Pipeline failed. Entity %s does not exist", this.config.getEntity()));
        }
        if (!entity.getSystem().equals(StorageSystem.BIGQUERY)) {
            this.config.checkMetastoreForGCSEntity(failureCollector, validateAndGetServiceAccountCredentials);
            prepareRunStorageBucket(batchSourceContext);
        } else {
            getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
            this.config.validateBigQueryDataset(failureCollector, datasetProject, dataset, tableId);
            prepareRunBigQueryDataset(batchSourceContext);
        }
    }

    private void getEntityValuesFromDataPathForBQEntities(String str) {
        String[] split = str.split("/");
        if (split.length >= 3) {
            dataset = split[split.length - 3];
            datasetProject = split[1];
            tableId = split[split.length - 1];
        }
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        outputSchema = this.config.getSchema(batchRuntimeContext.getFailureCollector());
        MetadataServiceClient metadataServiceClient = DataplexUtil.getMetadataServiceClient(this.config.getCredentials(batchRuntimeContext.getFailureCollector()));
        Throwable th = null;
        try {
            entity = metadataServiceClient.getEntity(EntityName.newBuilder().setProject(this.config.tryGetProject()).setLocation(this.config.getLocation()).setLake(this.config.getLake()).setZone(this.config.getZone()).setEntity(this.config.getEntity()).build());
            if (metadataServiceClient != null) {
                if (0 == 0) {
                    metadataServiceClient.close();
                    return;
                }
                try {
                    metadataServiceClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (metadataServiceClient != null) {
                if (0 != 0) {
                    try {
                        metadataServiceClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    metadataServiceClient.close();
                }
            }
            throw th3;
        }
    }

    private void prepareRunBigQueryDataset(BatchSourceContext batchSourceContext) throws Exception {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        outputSchema = DataplexUtil.getTableSchema(entity.getSchema(), failureCollector);
        String serviceAccount = this.config.getServiceAccount();
        GoogleCredentials credentials = this.config.getCredentials(failureCollector);
        BigQuery bigQuery = GCPUtils.getBigQuery(datasetProject, credentials);
        this.bucketPath = UUID.randomUUID().toString();
        this.configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, this.config.getProject(), null, this.config.getServiceAccountType());
        String createBucket = createBucket(this.configuration, this.config.getProject(), bigQuery, credentials, BigQueryUtil.getStagingBucketName(batchSourceContext.getArguments().asMap(), this.config.getLocation(), bigQuery.getDataset(DatasetId.of(datasetProject, dataset), new BigQuery.DatasetOption[0]), null), this.bucketPath);
        configureServiceAccount(this.configuration, this.config.getConnection());
        configureBigQuerySource();
        BigQuerySourceUtils.configureBigQueryInput(this.configuration, DatasetId.of(datasetProject, dataset), tableId, BigQuerySourceUtils.getTemporaryGcsPath(createBucket, this.bucketPath, this.bucketPath));
        this.configuration.set(DataplexConstants.DATAPLEX_ENTITY_TYPE, entity.getSystem().toString());
        emitLineage(batchSourceContext, outputSchema, this.config.getSourceTableType(datasetProject, dataset, tableId));
        batchSourceContext.setInput(Input.of(this.config.getReferenceName(BigQueryUtil.getFQN(datasetProject, dataset, tableId)), new DataplexInputFormatProvider(this.configuration)));
    }

    private void configureBigQuerySource() {
        if (this.config.getPartitionFrom() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, this.config.getPartitionFrom());
        }
        if (this.config.getPartitionTo() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_PARTITION_TO_DATE, this.config.getPartitionTo());
        }
        if (this.config.getFilter() != null) {
            this.configuration.set(BigQueryConstants.CONFIG_FILTER, this.config.getFilter());
        }
    }

    private void emitLineage(BatchSourceContext batchSourceContext, Schema schema, TableDefinition.Type type) {
        getEntityValuesFromDataPathForBQEntities(entity.getDataPath());
        String fqn = BigQueryUtil.getFQN(datasetProject, dataset, tableId);
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSourceContext, Asset.builder(this.config.getReferenceName(fqn)).setFqn(fqn).setLocation(this.config.getLocation()).build());
        lineageRecorder.createExternalDataset(schema);
        Object obj = "table";
        if (TableDefinition.Type.VIEW == type) {
            obj = "view";
        } else if (TableDefinition.Type.MATERIALIZED_VIEW == type) {
            obj = "materialized view";
        }
        if (schema.getFields() != null) {
            recordLineage(lineageRecorder, (List) schema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()), String.format("Read from BigQuery Entity %s '%s' from Dataplex.", obj, tableId));
        }
    }

    private void prepareRunStorageBucket(BatchSourceContext batchSourceContext) throws InstantiationException, IOException, ExecutionException, InterruptedException {
        FailureCollector failureCollector = batchSourceContext.getFailureCollector();
        Job createInstance = JobUtils.createInstance();
        this.configuration = createInstance.getConfiguration();
        createBucket(this.configuration, GCPUtils.getStorage(this.config.getProject(), this.config.getCredentials(failureCollector)), this.config.getLocation(), GCS_TEMP_BUCKET_NAME);
        String str = "gs://" + GCS_TEMP_BUCKET_NAME;
        setConfigurationForDataplex(createTask(str, formatQuery(entity, batchSourceContext.isPreviewEnabled()), failureCollector));
        ValidatingInputFormat validatingInputFormat = this.config.getValidatingInputFormat(batchSourceContext);
        FileInputFormat.setInputDirRecursive(createInstance, true);
        Schema tableSchema = DataplexUtil.getTableSchema(entity.getSchema(), failureCollector);
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSourceContext, Asset.builder(this.config.getReferenceName(entity.getDataPath())).setFqn(entity.getDataPath()).setLocation(this.config.getLocation()).build());
        lineageRecorder.createExternalDataset(tableSchema);
        if (tableSchema != null && tableSchema.getFields() != null) {
            recordLineage(lineageRecorder, (List) tableSchema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()), "Read from GCS entity in Dataplex.");
        }
        for (Map.Entry<String, String> entry : this.config.getFileSystemProperties(str).entrySet()) {
            this.configuration.set(entry.getKey(), entry.getValue());
        }
        Path path = new Path(str);
        if (FileSystem.get(path.toUri(), this.configuration).globStatus(path) == null) {
            throw new IOException(String.format("Input path %s does not exist", path));
        }
        FileInputFormat.addInputPath(createInstance, path);
        for (Map.Entry entry2 : validatingInputFormat.getInputFormatConfiguration().entrySet()) {
            this.configuration.set((String) entry2.getKey(), (String) entry2.getValue());
        }
        this.configuration.set(DataplexConstants.DATAPLEX_ENTITY_TYPE, entity.getSystem().toString());
        batchSourceContext.setInput(Input.of(this.config.getReferenceName(entity.getDataPath()), new DataplexInputFormatProvider(this.configuration)));
    }

    private void setConfigurationForDataplex(String str) {
        this.configuration.set(DataplexConstants.DATAPLEX_TASK_ID, str);
        this.configuration.set(DataplexConstants.DATAPLEX_PROJECT_ID, this.config.tryGetProject());
        this.configuration.set(DataplexConstants.DATAPLEX_LOCATION, this.config.getLocation());
        this.configuration.set(DataplexConstants.DATAPLEX_LAKE, this.config.getLake());
        this.configuration.set(DataplexConstants.SERVICE_ACCOUNT_TYPE, this.config.getServiceAccountType());
        this.configuration.set(DataplexConstants.SERVICE_ACCOUNT_FILEPATH, this.config.getServiceAccountFilePath() != null ? this.config.getServiceAccountFilePath() : "none");
    }

    private void recordLineage(LineageRecorder lineageRecorder, List<String> list, String str) {
        lineageRecorder.recordRead("Read", str, list);
    }

    public void onRunFinish(boolean z, BatchSourceContext batchSourceContext) {
        if (!entity.getSystem().equals(StorageSystem.BIGQUERY)) {
            Storage storage = GCPUtils.getStorage(this.config.tryGetProject(), this.config.getCredentials(batchSourceContext.getFailureCollector()));
            BigQuerySourceUtils.deleteGcsTemporaryDirectory(this.configuration, GCS_TEMP_BUCKET_NAME, "projects");
            storage.delete(GCS_TEMP_BUCKET_NAME, new Storage.BucketSourceOption[0]);
            LOG.debug("Deleted temporary bucket '{}'.", GCS_TEMP_BUCKET_NAME);
            return;
        }
        BigQuerySourceUtils.deleteGcsTemporaryDirectory(this.configuration, null, this.bucketPath);
        String str = this.configuration.get("cdap.bq.source.temporary.table.name");
        GCPUtils.getBigQuery(this.config.getProject(), this.config.getCredentials(batchSourceContext.getFailureCollector())).delete(TableId.of(datasetProject, dataset, str));
        LOG.debug("Deleted temporary table '{}'", str);
    }

    private String createTask(String str, String str2, FailureCollector failureCollector) throws IOException, ExecutionException, InterruptedException {
        Task build = Task.newBuilder().setTriggerSpec(Task.TriggerSpec.newBuilder().setType(Task.TriggerSpec.Type.ON_DEMAND).build()).setDescription("task-" + UUID.randomUUID()).setExecutionSpec(Task.ExecutionSpec.newBuilder().setServiceAccount(this.config.getServiceAccountEmail()).putArgs(DATAPLEX_TASK_ARGS, String.format("--output_location,%s, --output_format, %s", str, "avro")).build()).setSpark(Task.SparkTaskConfig.newBuilder().setSqlScript(str2).build()).build();
        DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(this.config.getCredentials(failureCollector));
        Throwable th = null;
        try {
            try {
                Task task = (Task) dataplexServiceClient.createTaskAsync(LakeName.newBuilder().setLake(this.config.getLake()).setProject(this.config.tryGetProject()).setLocation(this.config.getLocation()).build(), build, build.getDescription()).get();
                if (dataplexServiceClient != null) {
                    if (0 != 0) {
                        try {
                            dataplexServiceClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        dataplexServiceClient.close();
                    }
                }
                return task.getDescription();
            } finally {
            }
        } catch (Throwable th3) {
            if (dataplexServiceClient != null) {
                if (th != null) {
                    try {
                        dataplexServiceClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    dataplexServiceClient.close();
                }
            }
            throw th3;
        }
    }

    private String formatQuery(Entity entity2, boolean z) {
        StringBuilder sb = new StringBuilder();
        if (!Strings.isNullOrEmpty(this.config.getFilter())) {
            sb.append("where ").append(this.config.getFilter());
        }
        sb.append(z ? " LIMIT 1000;" : CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR);
        return String.format("select * from %s.%s %s", this.config.getZone(), entity2.getId(), sb);
    }

    public void transform(KeyValue<Object, Object> keyValue, Emitter<StructuredRecord> emitter) throws IOException {
        if (entity.getSystem().equals(StorageSystem.BIGQUERY)) {
            emitter.emit(outputSchema == null ? this.transformer.transform((GenericData.Record) keyValue.getValue()) : this.transformer.transform((GenericRecord) keyValue.getValue(), outputSchema));
        } else {
            emitter.emit((StructuredRecord) keyValue.getValue());
        }
    }

    private String createBucket(Configuration configuration, String str, BigQuery bigQuery, Credentials credentials, @Nullable String str2, String str3) throws IOException {
        if (str2 == null) {
            str2 = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, str3);
            configuration.setBoolean("fs.gs.bucket.delete.enable", true);
        }
        createBucket(configuration, GCPUtils.getStorage(str, credentials), bigQuery.getDataset(DatasetId.of(datasetProject, dataset), new BigQuery.DatasetOption[0]).getLocation(), str2);
        return str2;
    }

    private void configureServiceAccount(Configuration configuration, GCPConnectorConfig gCPConnectorConfig) {
        if (gCPConnectorConfig.getServiceAccount() != null) {
            configuration.set(BigQueryConstants.CONFIG_SERVICE_ACCOUNT, gCPConnectorConfig.getServiceAccount());
            configuration.setBoolean(BigQueryConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE, gCPConnectorConfig.isServiceAccountFilePath().booleanValue());
        }
    }

    private String createBucket(Configuration configuration, Storage storage, String str, @Nullable String str2) throws IOException {
        if (storage != null && storage.get(str2, new Storage.BucketGetOption[0]) == null) {
            try {
                configuration.setBoolean("fs.gs.bucket.delete.enable", true);
                GCPUtils.createBucket(storage, str2, str, null);
            } catch (StorageException e) {
                if (e.getCode() == 409) {
                    return str2;
                }
                throw new IOException(String.format("Unable to create Cloud Storage bucket '%s'. ", str2), e);
            }
        }
        return str2;
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<Object, Object>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
