package io.cdap.plugin.gcp.gcs.sink;

import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name(GCSMultiBatchSink.NAME)
@Description("Writes records to one or more Avro, ORC, Parquet or Delimited format files in a directory on Google Cloud Storage.")
@Metadata(properties = {@MetadataProperty(key = "connector", value = "GCS")})
@Plugin(type = "batchsink")
/* loaded from: input_file:io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.class */
public class GCSMultiBatchSink extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(GCSMultiBatchSink.class);
    public static final String NAME = "GCSMultiFiles";
    private static final String TABLE_PREFIX = "multisink.";
    private static final String FORMAT_PLUGIN_ID = "format";
    private static final String SCHEMA_MACRO = "__provided_schema__";
    private final GCSMultiBatchSinkConfig config;

    /* loaded from: input_file:io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink$GCSMultiBatchSinkConfig.class */
    public static class GCSMultiBatchSinkConfig extends GCSBatchSink.GCSBatchSinkConfig {
        private static final String NAME_ALLOW_FLEXIBLE_SCHEMA = "allowFlexibleSchema";

        @Description("The codec to use when writing data. The 'avro' format supports 'snappy' and 'deflate'. The parquet format supports 'snappy' and 'gzip'. Other formats do not support compression.")
        @Nullable
        private String compressionCodec;

        @Description("The name of the field that will be used to determine which directory to write to.")
        private String splitField = "tablename";

        @Name(NAME_ALLOW_FLEXIBLE_SCHEMA)
        @Description("Allow Flexible Schemas in output. If disabled, only records with schemas set as arguments will be processed. If enabled, all records will be written as-is.")
        @Nullable
        @Macro
        private Boolean allowFlexibleSchema;

        protected String getOutputDir(long j, String str) {
            return String.format("%s/%s/%s", getOutputBaseDir(), str, getOutputSuffix(j));
        }

        protected String getOutputBaseDir() {
            return getPath();
        }

        protected String getOutputSuffix(long j) {
            return !Strings.isNullOrEmpty(getSuffix()) ? new SimpleDateFormat(getSuffix()).format(Long.valueOf(j)) : "";
        }

        public Boolean getAllowFlexibleSchema() {
            return Boolean.valueOf(this.allowFlexibleSchema != null ? this.allowFlexibleSchema.booleanValue() : false);
        }
    }

    public GCSMultiBatchSink(GCSMultiBatchSinkConfig gCSMultiBatchSinkConfig) {
        this.config = gCSMultiBatchSinkConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
        this.config.validate(failureCollector);
        failureCollector.getOrThrowException();
        PluginProperties.Builder addAll = PluginProperties.builder().addAll(this.config.getProperties().getProperties());
        if (!this.config.getAllowFlexibleSchema().booleanValue()) {
            addAll.add("schema", String.format("${%s}", SCHEMA_MACRO));
        }
        PluginProperties build = addAll.build();
        if (!this.config.containsMacro("format")) {
            String formatName = this.config.getFormatName();
            if (((OutputFormatProvider) pipelineConfigurer.usePlugin("validatingOutputFormat", formatName, "format", build)) == null) {
                failureCollector.addFailure(String.format("Could not find the '%s' output format plugin.", formatName), (String) null).withPluginNotFound("format", formatName, "validatingOutputFormat");
                return;
            }
            return;
        }
        for (FileFormat fileFormat : FileFormat.values()) {
            try {
                pipelineConfigurer.usePlugin("validatingOutputFormat", fileFormat.name().toLowerCase(), fileFormat.name().toLowerCase(), this.config.getRawProperties());
            } catch (InvalidPluginConfigException e) {
                LOG.warn("Failed to register format '{}', which means it cannot be used when the pipeline is run. Missing properties: {}, invalid properties: {}", new Object[]{fileFormat.name(), e.getMissingProperties(), e.getInvalidProperties().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList())});
            }
        }
    }

    public void prepareRun(BatchSinkContext batchSinkContext) throws IOException, InstantiationException {
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        this.config.validate(failureCollector, batchSinkContext.getArguments().asMap());
        failureCollector.getOrThrowException();
        Map<String, String> fileSystemProperties = GCPUtils.getFileSystemProperties(this.config.connection, this.config.getPath(), new HashMap());
        HashMap hashMap = new HashMap(batchSinkContext.getArguments().asMap());
        CryptoKeyName cmekKey = CmekUtils.getCmekKey(this.config.cmekKey, batchSinkContext.getArguments().asMap(), failureCollector);
        failureCollector.getOrThrowException();
        Boolean isServiceAccountFilePath = this.config.connection.isServiceAccountFilePath();
        if (isServiceAccountFilePath == null) {
            batchSinkContext.getFailureCollector().addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`");
            batchSinkContext.getFailureCollector().getOrThrowException();
            return;
        }
        Storage storage = GCPUtils.getStorage(this.config.connection.getProject(), this.config.connection.getServiceAccount() == null ? null : GCPUtils.loadServiceAccountCredentials(this.config.connection.getServiceAccount(), isServiceAccountFilePath.booleanValue()));
        try {
            if (storage.get(this.config.getBucket(), new Storage.BucketGetOption[0]) == null) {
                GCPUtils.createBucket(storage, this.config.getBucket(), this.config.getLocation(), cmekKey);
            }
            if (this.config.getAllowFlexibleSchema().booleanValue()) {
                configureSchemalessMultiSink(batchSinkContext, fileSystemProperties, hashMap);
            } else {
                configureMultiSinkWithSchema(batchSinkContext, fileSystemProperties, hashMap);
            }
        } catch (StorageException e) {
            throw new RuntimeException(String.format("Unable to access or create bucket %s. ", this.config.getBucket()) + "Ensure you entered the correct bucket path and have permissions for it.", e);
        }
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) {
        emitter.emit(new KeyValue(NullWritable.get(), structuredRecord));
    }

    private void configureMultiSinkWithSchema(BatchSinkContext batchSinkContext, Map<String, String> map, Map<String, String> map2) throws IOException, InstantiationException {
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            String key = entry.getKey();
            if (key.startsWith(TABLE_PREFIX)) {
                String substring = key.substring(TABLE_PREFIX.length());
                Schema parseJson = Schema.parseJson(entry.getValue());
                batchSinkContext.getArguments().set(SCHEMA_MACRO, parseJson.toString());
                ValidatingOutputFormat validatingOutputFormat = (ValidatingOutputFormat) batchSinkContext.newPluginInstance("format");
                HashMap hashMap = new HashMap(map);
                hashMap.putAll(validatingOutputFormat.getOutputFormatConfiguration());
                hashMap.putAll(RecordFilterOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), this.config.splitField, substring, parseJson));
                hashMap.put(FileOutputFormat.OUTDIR, this.config.getOutputDir(batchSinkContext.getLogicalStartTime(), substring));
                hashMap.put(GCSBatchSink.CONTENT_TYPE, this.config.getContentType());
                batchSinkContext.addOutput(Output.of(this.config.getReferenceName() + "_" + substring, new SinkOutputFormatProvider(RecordFilterOutputFormat.class.getName(), hashMap)));
            }
        }
    }

    private void configureSchemalessMultiSink(BatchSinkContext batchSinkContext, Map<String, String> map, Map<String, String> map2) throws InstantiationException {
        ValidatingOutputFormat validatingOutputFormat = (ValidatingOutputFormat) batchSinkContext.newPluginInstance("format");
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(validatingOutputFormat.getOutputFormatConfiguration());
        hashMap.putAll(DelegatingGCSOutputFormat.configure(validatingOutputFormat.getOutputFormatClassName(), this.config.splitField, this.config.getOutputBaseDir(), this.config.getOutputSuffix(batchSinkContext.getLogicalStartTime())));
        hashMap.put(GCSBatchSink.CONTENT_TYPE, this.config.getContentType());
        batchSinkContext.addOutput(Output.of(this.config.getReferenceName(), new SinkOutputFormatProvider(DelegatingGCSOutputFormat.class.getName(), hashMap)));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<NullWritable, StructuredRecord>>) emitter);
    }
}
