package io.cdap.plugin.gcp.spanner.sink;

import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ReplicaInfo;
import com.google.cloud.spanner.Spanner;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.spanner.SpannerConstants;
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Spanner")
@Description("Batch sink to write to Cloud Spanner. Cloud Spanner is a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = "Spanner")})
@Plugin(type = "batchsink")
/* loaded from: input_file:io/cdap/plugin/gcp/spanner/sink/SpannerSink.class */
public final class SpannerSink extends BatchSink<StructuredRecord, NullWritable, Mutation> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerSink.class);
    public static final String NAME = "Spanner";
    private static final String TABLE_NAME = "tablename";
    private final SpannerSinkConfig config;
    private RecordToMutationTransformer transformer;

    public SpannerSink(SpannerSinkConfig spannerSinkConfig) {
        this.config = spannerSinkConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
    }

    public void prepareRun(BatchSinkContext batchSinkContext) throws Exception {
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        this.config.validate(failureCollector, batchSinkContext.getArguments().asMap());
        failureCollector.getOrThrowException();
        Schema schema = this.config.getSchema(failureCollector);
        Schema inputSchema = schema == null ? batchSinkContext.getInputSchema() : schema;
        CryptoKeyName cmekKey = CmekUtils.getCmekKey(this.config.cmekKey, batchSinkContext.getArguments().asMap(), batchSinkContext.getFailureCollector());
        Configuration configuration = new Configuration();
        configuration.setBoolean(SpannerConstants.IS_PREVIEW_ENABLED, batchSinkContext.isPreviewEnabled());
        String str = null;
        if (cmekKey != null) {
            configuration.set(SpannerConstants.CMEK_KEY, cmekKey.toString());
        }
        Spanner spannerService = SpannerUtil.getSpannerService(this.config.connection.getServiceAccount(), this.config.connection.isServiceAccountFilePath().booleanValue(), this.config.connection.getProject());
        Throwable th = null;
        try {
            try {
                ReplicaInfo replicaInfo = spannerService.getInstanceAdminClient().getInstanceConfig(spannerService.getInstanceAdminClient().getInstance(this.config.getInstance()).getInstanceConfigId().getInstanceConfig()).getReplicas().get(0);
                if (replicaInfo != null) {
                    str = replicaInfo.getLocation();
                }
                if (spannerService != null) {
                    if (0 != 0) {
                        try {
                            spannerService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        spannerService.close();
                    }
                }
                LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSinkContext, Asset.builder(this.config.getReferenceName()).setFqn(this.config.getFQN()).setLocation(str).build());
                lineageRecorder.createExternalDataset(inputSchema);
                SpannerOutputFormat.configure(configuration, this.config, inputSchema);
                batchSinkContext.addOutput(Output.of(this.config.getReferenceName(), new SinkOutputFormatProvider((Class<? extends OutputFormat>) SpannerOutputFormat.class, configuration)));
                List fields = inputSchema.getFields();
                if (fields == null || fields.isEmpty()) {
                    return;
                }
                lineageRecorder.recordWrite("Write", "Wrote to Spanner table.", (List) fields.stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()));
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (spannerService != null) {
                if (th != null) {
                    try {
                        spannerService.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    spannerService.close();
                }
            }
            throw th4;
        }
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        FailureCollector failureCollector = batchRuntimeContext.getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        this.transformer = new RecordToMutationTransformer(this.config.getTable(), this.config.getSchema(failureCollector));
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<NullWritable, Mutation>> emitter) {
        emitter.emit(new KeyValue((Object) null, this.transformer.transform(structuredRecord)));
    }

    public void destroy() {
        super.destroy();
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<NullWritable, Mutation>>) emitter);
    }
}
