package io.cdap.plugin.gcp.bigquery.sink;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.class */
public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, StructuredRecord, NullWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBigQuerySink.class);
    private static final String gcsPathFormat = "gs://%s/%s";
    public static final String RECORDS_UPDATED_METRIC = "records.updated";
    private final UUID runUUID = UUID.randomUUID();
    protected Configuration baseConfiguration;
    protected BigQuery bigQuery;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySink$1, reason: invalid class name */
    /* loaded from: input_file:io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$cdap$cdap$api$data$schema$Schema$Type = new int[Schema.Type.values().length];

        static {
            try {
                $SwitchMap$io$cdap$cdap$api$data$schema$Schema$Type[Schema.Type.RECORD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$cdap$cdap$api$data$schema$Schema$Type[Schema.Type.ARRAY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public final void prepareRun(BatchSinkContext batchSinkContext) throws Exception {
        prepareRunValidation(batchSinkContext);
        AbstractBigQuerySinkConfig config = getConfig();
        String serviceAccount = config.getServiceAccount();
        GoogleCredentials loadServiceAccountCredentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath().booleanValue());
        String project = config.getProject();
        this.bigQuery = GCPUtils.getBigQuery(project, loadServiceAccountCredentials);
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        CryptoKeyName cmekKey = CmekUtils.getCmekKey(config.cmekKey, batchSinkContext.getArguments().asMap(), failureCollector);
        failureCollector.getOrThrowException();
        this.baseConfiguration = getBaseConfiguration(cmekKey);
        DatasetId of = DatasetId.of(config.getDatasetProject(), config.getDataset());
        Dataset dataset = this.bigQuery.getDataset(of, new BigQuery.DatasetOption[0]);
        Storage storage = GCPUtils.getStorage(project, loadServiceAccountCredentials);
        String configureBucket = BigQuerySinkUtils.configureBucket(this.baseConfiguration, BigQueryUtil.getStagingBucketName(batchSinkContext.getArguments().asMap(), config.getLocation(), dataset, config.getBucket()), this.runUUID.toString());
        Bucket bucket = storage.get(configureBucket, new Storage.BucketGetOption[0]);
        if (!batchSinkContext.isPreviewEnabled()) {
            BigQuerySinkUtils.createResources(this.bigQuery, dataset, of, storage, bucket, configureBucket, config.getLocation(), cmekKey);
        }
        prepareRunInternal(batchSinkContext, this.bigQuery, configureBucket);
    }

    @Override // 
    public void onRunFinish(boolean z, BatchSinkContext batchSinkContext) {
        String bucket = getConfig().getBucket();
        String format = bucket == null ? String.format(BigQuerySourceUtils.GCS_BUCKET_FORMAT, this.runUUID.toString()) : String.format("gs://%s/%s", bucket, this.runUUID.toString());
        try {
            BigQueryUtil.deleteTemporaryDirectory(this.baseConfiguration, format);
        } catch (IOException e) {
            LOG.warn("Failed to delete temporary directory '{}': {}", format, e.getMessage());
        }
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<StructuredRecord, NullWritable>> emitter) {
        emitter.emit(new KeyValue(structuredRecord, NullWritable.get()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void initOutput(BatchSinkContext batchSinkContext, BigQuery bigQuery, String str, String str2, String str3, @Nullable Schema schema, String str4, FailureCollector failureCollector, @Nullable String str5) throws IOException {
        LOG.debug("Init output for table '{}' with schema: {}", str3, schema);
        List<BigQueryTableFieldSchema> bigQueryTableFields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, str3, schema, getConfig().isAllowSchemaRelaxation(), getConfig().getDatasetProject(), getConfig().getDataset(), getConfig().isTruncateTableSet(), failureCollector);
        Configuration configuration = new Configuration(this.baseConfiguration);
        BigQuerySinkUtils.configureOutput(configuration, DatasetId.of(getConfig().getDatasetProject(), getConfig().getDataset()), str3, BigQuerySinkUtils.getTemporaryGcsPath(str4, this.runUUID.toString(), str3), bigQueryTableFields);
        List list = (List) bigQueryTableFields.stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        Dataset dataset = bigQuery.getDataset(DatasetId.of(getConfig().getDatasetProject(), getConfig().getDataset()), new BigQuery.DatasetOption[0]);
        String location = dataset != null ? dataset.getLocation() : getConfig().getLocation();
        BigQuerySinkUtils.recordLineage(batchSinkContext, str5 == null ? Asset.builder(str).setFqn(str2).setLocation(location).build() : Asset.builder(str).setFqn(str2).setLocation(location).setMarker(str5).build(), schema, list, str3);
        batchSinkContext.addOutput(Output.of(str, getOutputFormatProvider(configuration, str3, schema)));
    }

    protected abstract AbstractBigQuerySinkConfig getConfig();

    protected abstract void prepareRunValidation(BatchSinkContext batchSinkContext);

    protected abstract void prepareRunInternal(BatchSinkContext batchSinkContext, BigQuery bigQuery, String str) throws IOException;

    protected abstract OutputFormatProvider getOutputFormatProvider(Configuration configuration, String str, Schema schema);

    private Configuration getBaseConfiguration(@Nullable CryptoKeyName cryptoKeyName) throws IOException {
        AbstractBigQuerySinkConfig config = getConfig();
        Configuration bigQueryConfig = BigQueryUtil.getBigQueryConfig(config.getServiceAccount(), config.getProject(), cryptoKeyName, config.getServiceAccountType());
        bigQueryConfig.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION, config.isAllowSchemaRelaxation());
        bigQueryConfig.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY, config.getWriteDisposition().name());
        bigQueryConfig.set("fs.gs.outputstream.upload.chunk.size", Strings.isNullOrEmpty(config.getGcsChunkSize()) ? "8388608" : config.getGcsChunkSize());
        return bigQueryConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void validateRecordDepth(@Nullable Schema schema, FailureCollector failureCollector) {
        validateRecordDepth(schema, failureCollector, 0, null);
    }

    private void validateRecordDepth(@Nullable Schema schema, FailureCollector failureCollector, int i, String str) {
        if (schema == null) {
            return;
        }
        if (str == null) {
            str = "";
        }
        List<Schema.Field> fields = schema.getFields();
        if (fields == null) {
            return;
        }
        for (Schema.Field field : fields) {
            String str2 = str + field.getName();
            if (i != 15) {
                Schema nonNullableSchema = BigQueryUtil.getNonNullableSchema(field.getSchema());
                switch (AnonymousClass1.$SwitchMap$io$cdap$cdap$api$data$schema$Schema$Type[nonNullableSchema.getType().ordinal()]) {
                    case 1:
                        validateRecordDepth(nonNullableSchema, failureCollector, i + 1, str2 + ".");
                        break;
                    case 2:
                        if (nonNullableSchema.getComponentSchema() == null) {
                            break;
                        } else {
                            Schema nonNullableSchema2 = BigQueryUtil.getNonNullableSchema(nonNullableSchema.getComponentSchema());
                            if (nonNullableSchema2.getType() == Schema.Type.RECORD) {
                                validateRecordDepth(nonNullableSchema2, failureCollector, i + 1, str2 + ".");
                                break;
                            } else {
                                validateRecordDepth(nonNullableSchema2, failureCollector, i, str2 + ".");
                                break;
                            }
                        }
                }
            } else {
                failureCollector.addFailure(String.format("Field '%s' exceeds BigQuery maximum allowed depth of %d.", str2, 15), "Please flatten the schema to contain fewer levels.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration getOutputConfiguration() throws IOException {
        return new Configuration(this.baseConfiguration);
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<StructuredRecord, NullWritable>>) emitter);
    }
}
