package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Table;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("BigQueryTable")
@Description("This sink writes to a BigQuery table. BigQuery is Google's serverless, highly scalable, enterprise data warehouse. Data is first written to a temporary location on Google Cloud Storage, then loaded into BigQuery from there.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = BigQueryConnector.NAME)})
@Plugin(type = "batchsink")
/* loaded from: input_file:io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.class */
public final class BigQuerySink extends AbstractBigQuerySink {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySink.class);
    private static final Gson GSON = new Gson();
    public static final String NAME = "BigQueryTable";
    private final BigQuerySinkConfig config;
    private final String jobId = UUID.randomUUID().toString();

    public BigQuerySink(BigQuerySinkConfig bigQuerySinkConfig) {
        this.config = bigQuerySinkConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
        FailureCollector failureCollector = stageConfigurer.getFailureCollector();
        Schema inputSchema = stageConfigurer.getInputSchema();
        Schema schema = this.config.getSchema(failureCollector);
        this.config.validate(inputSchema, schema, failureCollector, Collections.emptyMap());
        if (this.config.connection == null || this.config.tryGetProject() == null || this.config.getServiceAccountType() == null) {
            return;
        }
        if (this.config.isServiceAccountFilePath().booleanValue() && this.config.connection.autoServiceAccountUnavailable()) {
            return;
        }
        Schema schema2 = schema == null ? inputSchema : schema;
        if (schema2 != null) {
            validateConfiguredSchema(schema2, failureCollector);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink
    public BigQuerySinkConfig getConfig() {
        return this.config;
    }

    @Override // io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink
    protected void prepareRunValidation(BatchSinkContext batchSinkContext) {
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        this.config.validate(batchSinkContext.getInputSchema(), this.config.getSchema(failureCollector), failureCollector, batchSinkContext.getArguments().asMap());
        failureCollector.getOrThrowException();
    }

    @Override // io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink
    protected void prepareRunInternal(BatchSinkContext batchSinkContext, BigQuery bigQuery, String str) throws IOException {
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        Schema schema = this.config.getSchema(failureCollector);
        Schema inputSchema = schema == null ? batchSinkContext.getInputSchema() : schema;
        configureTable(inputSchema);
        configureBigQuerySink();
        initOutput(batchSinkContext, bigQuery, this.config.getReferenceName(), BigQueryUtil.getFQN(this.config.getDatasetProject(), this.config.getDataset(), this.config.getTable()), this.config.getTable(), inputSchema, str, failureCollector, null);
        initSQLEngineOutput(batchSinkContext, bigQuery, this.config.getReferenceName(), batchSinkContext.getStageName(), this.config.getTable(), inputSchema, failureCollector);
    }

    @Override // io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink
    public void onRunFinish(boolean z, BatchSinkContext batchSinkContext) {
        super.onRunFinish(z, batchSinkContext);
        try {
            recordMetric(z, batchSinkContext);
        } catch (Exception e) {
            LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.", e);
        }
    }

    void initSQLEngineOutput(BatchSinkContext batchSinkContext, BigQuery bigQuery, String str, String str2, String str3, @Nullable Schema schema, FailureCollector failureCollector) {
        if (schema == null) {
            LOG.debug("BigQuery SQL Engine Output was not initialized. Schema was empty.");
            return;
        }
        List list = (List) BigQuerySinkUtils.getBigQueryTableFields(bigQuery, str3, schema, getConfig().isAllowSchemaRelaxation(), this.config.getDatasetProject(), this.config.getDataset(), this.config.isTruncateTableSet(), failureCollector).stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        ImmutableMap.Builder builder = new ImmutableMap.Builder();
        builder.put(BigQueryWrite.SQL_OUTPUT_JOB_ID, this.jobId + "_write").put("config", GSON.toJson(this.config)).put("schema", GSON.toJson(schema)).put("fields", GSON.toJson(list));
        batchSinkContext.addOutput(new SQLEngineOutput(str, str2, BigQuerySQLEngine.class.getName(), builder.build()));
    }

    void recordMetric(boolean z, BatchSinkContext batchSinkContext) {
        if (z) {
            JobId jobId = getJobId();
            Job job = jobId != null ? this.bigQuery.getJob(jobId, new BigQuery.JobOption[0]) : null;
            if (job == null) {
                LOG.warn("Unable to find BigQuery job. No metric will be emitted for the number of affected rows.");
                return;
            }
            long totalRows = getTotalRows(job);
            LOG.info("Job {} affected {} rows", job.getJobId(), Long.valueOf(totalRows));
            long j = totalRows / 2147483647L;
            if (j > 10000) {
                LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
            }
            long j2 = j < ((long) 10000) ? j : 10000;
            for (int i = 0; i <= j2 && totalRows > 0; i++) {
                int i2 = totalRows < 2147483647L ? (int) totalRows : Integer.MAX_VALUE;
                batchSinkContext.getMetrics().count(AbstractBigQuerySink.RECORDS_UPDATED_METRIC, i2);
                totalRows -= i2;
            }
            batchSinkContext.getMetrics().child(new ImmutableMap.Builder().put("aet", "batchsink").put("tpe", "BigQueryTable").build()).countLong("bytes.processed", getTotalBytes(job));
        }
    }

    @Nullable
    private JobId getJobId() {
        BigQuerySinkConfig config = getConfig();
        Dataset dataset = this.bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()), new BigQuery.DatasetOption[0]);
        if (dataset == null) {
            LOG.warn("Dataset {} was not found in project {}", config.getDataset(), config.getDatasetProject());
            return null;
        }
        String location = dataset.getLocation();
        JobId build = JobId.newBuilder().setLocation(location).setJob(this.jobId).build();
        if (this.bigQuery.getJob(build, new BigQuery.JobOption[0]) != null) {
            return build;
        }
        LOG.warn("Job {} was not found in location {}", this.jobId, location);
        return null;
    }

    private long getTotalRows(Job job) {
        JobConfiguration.Type type = job.getConfiguration().getType();
        if (type == JobConfiguration.Type.LOAD) {
            return ((JobStatistics.LoadStatistics) job.getStatistics()).getOutputRows().longValue();
        }
        if (type == JobConfiguration.Type.QUERY) {
            return ((JobStatistics.QueryStatistics) job.getStatistics()).getNumDmlAffectedRows().longValue();
        }
        LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows.");
        return 0L;
    }

    private long getTotalBytes(Job job) {
        JobConfiguration.Type type = job.getConfiguration().getType();
        if (type == JobConfiguration.Type.LOAD) {
            long longValue = ((JobStatistics.LoadStatistics) job.getStatistics()).getOutputBytes().longValue();
            LOG.info("Job {} loaded {} bytes", job.getJobId(), Long.valueOf(longValue));
            return longValue;
        }
        if (type != JobConfiguration.Type.QUERY) {
            LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected bytes.");
            return 0L;
        }
        long longValue2 = ((JobStatistics.QueryStatistics) job.getStatistics()).getTotalBytesProcessed().longValue();
        LOG.info("Job {} processed {} bytes", job.getJobId(), Long.valueOf(longValue2));
        return longValue2;
    }

    @Override // io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink
    protected OutputFormatProvider getOutputFormatProvider(Configuration configuration, String str, Schema schema) {
        return new BigQueryOutputFormatProvider(configuration, schema);
    }

    private void configureBigQuerySink() {
        this.baseConfiguration.set(BigQueryConstants.CONFIG_JOB_ID, this.jobId);
        if (this.config.getPartitionByField() != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, getConfig().getPartitionByField());
        }
        this.baseConfiguration.setBoolean(BigQueryConstants.CONFIG_REQUIRE_PARTITION_FILTER, getConfig().isPartitionFilterRequired());
        if (this.config.getClusteringOrder() != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_CLUSTERING_ORDER, getConfig().getClusteringOrder());
        }
        this.baseConfiguration.set(BigQueryConstants.CONFIG_OPERATION, getConfig().getOperation().name());
        if (this.config.getRelationTableKey() != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_TABLE_KEY, getConfig().getRelationTableKey());
        }
        if (this.config.getDedupeBy() != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_DEDUPE_BY, getConfig().getDedupeBy());
        }
        if (this.config.getPartitionFilter() != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_FILTER, getConfig().getPartitionFilter());
        }
        this.baseConfiguration.setEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, getConfig().getPartitioningType());
        if (this.config.getRangeStart() != null) {
            this.baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, this.config.getRangeStart().longValue());
        }
        if (this.config.getRangeEnd() != null) {
            this.baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, this.config.getRangeEnd().longValue());
        }
        if (this.config.getRangeInterval() != null) {
            this.baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, this.config.getRangeInterval().longValue());
        }
    }

    private void configureTable(Schema schema) {
        BigQuerySinkConfig config = getConfig();
        Table bigQueryTable = BigQueryUtil.getBigQueryTable(config.getDatasetProject(), config.getDataset(), config.getTable(), config.getServiceAccount(), config.isServiceAccountFilePath().booleanValue());
        this.baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, bigQueryTable != null);
        List list = null;
        if (bigQueryTable != null) {
            list = (List) ((com.google.cloud.bigquery.Schema) Objects.requireNonNull(bigQueryTable.getDefinition().getSchema())).getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList());
        } else if (schema != null) {
            list = (List) schema.getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList());
        }
        if (list != null) {
            this.baseConfiguration.set(BigQueryConstants.CONFIG_TABLE_FIELDS, String.join(",", list));
        }
    }

    private void validateConfiguredSchema(Schema schema, FailureCollector failureCollector) {
        if (this.config.shouldConnect()) {
            validateRecordDepth(schema, failureCollector);
            String table = this.config.getTable();
            Table bigQueryTable = BigQueryUtil.getBigQueryTable(this.config.getDatasetProject(), this.config.getDataset(), table, this.config.getServiceAccount(), this.config.isServiceAccountFilePath(), failureCollector);
            if (bigQueryTable == null || this.config.containsMacro("allowSchemaRelaxation")) {
                return;
            }
            com.google.cloud.bigquery.Schema schema2 = bigQueryTable.getDefinition().getSchema();
            if (this.config.getOperation().equals(Operation.INSERT)) {
                BigQuerySinkUtils.validateInsertSchema(bigQueryTable, schema, this.config.allowSchemaRelaxation.booleanValue(), this.config.isTruncateTableSet(), this.config.getDataset(), failureCollector);
            } else if (this.config.getOperation().equals(Operation.UPSERT)) {
                BigQuerySinkUtils.validateSchema(table, schema2, schema, this.config.allowSchemaRelaxation.booleanValue(), this.config.isTruncateTableSet(), this.config.getDataset(), failureCollector);
            }
        }
    }
}
