package io.cdap.plugin.gcp.publisher;

import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.base.Strings;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Topic;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;

@Name("GooglePublisher")
@Description("Writes to a Google Cloud Pub/Sub topic. Cloud Pub/Sub brings the scalability, flexibility, and reliability of enterprise message-oriented middleware to the cloud. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication between independently written applications")
@Plugin(type = "batchsink")
/* loaded from: input_file:io/cdap/plugin/gcp/publisher/GooglePublisher.class */
public class GooglePublisher extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
    private final Config config;

    /* loaded from: input_file:io/cdap/plugin/gcp/publisher/GooglePublisher$Config.class */
    public static class Config extends GCPReferenceSinkConfig {
        public static final String NAME_MESSAGE_COUNT_BATCH_SIZE = "messageCountBatchSize";
        public static final String NAME_REQUEST_THRESHOLD_KB = "requestThresholdKB";
        public static final String NAME_PUBLISH_DELAY_THRESHOLD_MILLIS = "publishDelayThresholdMillis";
        public static final String NAME_ERROR_THRESHOLD = "errorThreshold";
        public static final String NAME_RETRY_TIMEOUT_SECONDS = "retryTimeoutSeconds";

        @Description("Cloud Pub/Sub topic to publish records to")
        @Macro
        private String topic;

        @Macro
        @Description("Format of the data to read. Supported formats are 'avro', 'blob', 'tsv', 'csv', 'delimited', 'json', 'parquet' and 'text'.")
        @Nullable
        private String format;

        @Description("The delimiter to use if the format is 'delimited'. The delimiter will be ignored if the format is anything other than 'delimited'.")
        @Macro
        @Nullable
        private String delimiter;

        @Description("Maximum count of messages in a batch. The default value is 100.")
        @Macro
        @Nullable
        private Long messageCountBatchSize;

        @Description("Maximum size of a batch in kilo bytes. The default value is 1KB.")
        @Macro
        @Nullable
        private Long requestThresholdKB;

        @Description("Maximum delay in milli-seconds for publishing the batched messages. The default value is 1 ms.")
        @Macro
        @Nullable
        private Long publishDelayThresholdMillis;

        @Description("Maximum number of message publishing failures to tolerate per partition before the pipeline will be failed. The default value is 0.")
        @Macro
        @Nullable
        private Long errorThreshold;

        @Description("Maximum amount of time in seconds to spend retrying publishing failures. The default value is 30 seconds.")
        @Macro
        @Nullable
        private Integer retryTimeoutSeconds;

        @Name("cmekKey")
        @Description("The GCP customer managed encryption key (CMEK) name used to encrypt data written to any topic created by the plugin. If the topic already exists, this is ignored. More information can be found at https://cloud.google.com/data-fusion/docs/how-to/customer-managed-encryption-keys")
        @Nullable
        @Macro
        private String cmekKey;

        public Config(String str, String str2, @Nullable Long l, @Nullable Long l2, @Nullable Long l3, @Nullable Long l4, @Nullable Integer num) {
            this.referenceName = str;
            this.topic = str2;
            this.messageCountBatchSize = l;
            this.requestThresholdKB = l2;
            this.publishDelayThresholdMillis = l3;
            this.errorThreshold = l4;
            this.retryTimeoutSeconds = num;
        }

        @Override // io.cdap.plugin.gcp.common.GCPReferenceSinkConfig
        public void validate(FailureCollector failureCollector, Map<String, String> map) {
            super.validate(failureCollector, map);
            if (!containsMacro(NAME_MESSAGE_COUNT_BATCH_SIZE) && this.messageCountBatchSize != null && this.messageCountBatchSize.longValue() < 1) {
                failureCollector.addFailure("Invalid maximum count of messages in a batch.", "Ensure the value is a positive number.").withConfigProperty(NAME_MESSAGE_COUNT_BATCH_SIZE);
            }
            if (!containsMacro(NAME_REQUEST_THRESHOLD_KB) && this.requestThresholdKB != null && this.requestThresholdKB.longValue() < 1) {
                failureCollector.addFailure("Invalid maximum batch size.", "Ensure the value is a positive number.").withConfigProperty(NAME_REQUEST_THRESHOLD_KB);
            }
            if (!containsMacro(NAME_PUBLISH_DELAY_THRESHOLD_MILLIS) && this.publishDelayThresholdMillis != null && this.publishDelayThresholdMillis.longValue() < 1) {
                failureCollector.addFailure("Invalid delay threshold for publishing a batch.", "Ensure the value is a positive number.").withConfigProperty(NAME_PUBLISH_DELAY_THRESHOLD_MILLIS);
            }
            if (!containsMacro(NAME_ERROR_THRESHOLD) && this.errorThreshold != null && this.errorThreshold.longValue() < 0) {
                failureCollector.addFailure("Invalid error threshold for publishing.", "Ensure the value is a positive number.").withConfigProperty(NAME_ERROR_THRESHOLD);
            }
            if (!containsMacro(NAME_RETRY_TIMEOUT_SECONDS) && this.retryTimeoutSeconds != null && this.retryTimeoutSeconds.intValue() < 1) {
                failureCollector.addFailure("Invalid max retry timeout for retrying failed publish.", "Ensure the value is a positive number.").withConfigProperty(NAME_RETRY_TIMEOUT_SECONDS);
            }
            if (!containsMacro("delimiter") && !containsMacro("format") && getFormat().equalsIgnoreCase(PubSubConstants.DELIMITED) && this.delimiter == null) {
                failureCollector.addFailure(String.format("Delimiter is required when format is set to %s.", this.format), "Ensure the delimiter is provided.").withConfigProperty(this.delimiter);
            }
            if (!containsMacro("cmekKey")) {
                CmekUtils.getCmekKey(this.cmekKey, map, failureCollector);
            }
            failureCollector.getOrThrowException();
        }

        public long getRequestBytesThreshold() {
            if (this.requestThresholdKB == null) {
                return 1024L;
            }
            return this.requestThresholdKB.longValue() * 1024;
        }

        public long getMessageCountBatchSize() {
            if (this.messageCountBatchSize == null) {
                return 100L;
            }
            return this.messageCountBatchSize.longValue();
        }

        public long getPublishDelayThresholdMillis() {
            if (this.publishDelayThresholdMillis == null) {
                return 1L;
            }
            return this.publishDelayThresholdMillis.longValue();
        }

        public long getErrorThreshold() {
            if (this.errorThreshold == null) {
                return 0L;
            }
            return this.errorThreshold.longValue();
        }

        public int getRetryTimeoutSeconds() {
            if (this.retryTimeoutSeconds == null) {
                return 30;
            }
            return this.retryTimeoutSeconds.intValue();
        }

        public String getTopic() {
            return this.topic;
        }

        public String getFormat() {
            return Strings.isNullOrEmpty(this.format) ? "text" : this.format;
        }

        public String getDelimiter() {
            return this.delimiter;
        }

        @Nullable
        public Long getRequestThresholdKB() {
            return this.requestThresholdKB;
        }
    }

    public GooglePublisher(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
    }

    public void prepareRun(BatchSinkContext batchSinkContext) throws IOException {
        FailureCollector failureCollector = batchSinkContext.getFailureCollector();
        this.config.validate(failureCollector);
        TopicAdminSettings.Builder newBuilder = TopicAdminSettings.newBuilder();
        Boolean isServiceAccountFilePath = this.config.isServiceAccountFilePath();
        if (isServiceAccountFilePath == null) {
            batchSinkContext.getFailureCollector().addFailure("Service account type is undefined.", "Must be `filePath` or `JSON`");
            failureCollector.getOrThrowException();
        }
        String serviceAccount = this.config.getServiceAccount();
        if (!Strings.isNullOrEmpty(serviceAccount)) {
            newBuilder.setCredentialsProvider(() -> {
                return GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath.booleanValue());
            });
        }
        String project = this.config.getProject();
        ProjectTopicName of = ProjectTopicName.of(project, this.config.topic);
        if (!batchSinkContext.isPreviewEnabled()) {
            TopicAdminClient create = TopicAdminClient.create(newBuilder.build());
            Throwable th = null;
            try {
                try {
                    create.getTopic(of);
                } catch (NotFoundException e) {
                    try {
                        Topic.Builder name = Topic.newBuilder().setName(of.toString());
                        CryptoKeyName cmekKey = CmekUtils.getCmekKey(this.config.cmekKey, batchSinkContext.getArguments().asMap(), batchSinkContext.getFailureCollector());
                        batchSinkContext.getFailureCollector().getOrThrowException();
                        if (cmekKey != null) {
                            name.setKmsKeyName(cmekKey.toString());
                        }
                        create.createTopic(name.build());
                    } catch (AlreadyExistsException e2) {
                    } catch (ApiException e3) {
                        throw new IOException(String.format("Could not auto-create topic '%s' in project '%s'. Please ensure it is created before running the pipeline, or ensure that the service account has permission to create the topic.", this.config.topic, project), e);
                    }
                }
            } finally {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            }
        }
        Schema inputSchema = batchSinkContext.getInputSchema();
        LineageRecorder lineageRecorder = new LineageRecorder((BatchContext) batchSinkContext, this.config.getReferenceName());
        lineageRecorder.createExternalDataset(inputSchema);
        Configuration configuration = new Configuration();
        PubSubOutputFormat.configure(configuration, this.config);
        batchSinkContext.addOutput(Output.of(this.config.getReferenceName(), new SinkOutputFormatProvider((Class<? extends OutputFormat>) PubSubOutputFormat.class, configuration)));
        if (inputSchema == null || inputSchema.getFields() == null || inputSchema.getFields().isEmpty()) {
            return;
        }
        lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Pub/Sub.", (List) inputSchema.getFields().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()));
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) throws Exception {
        emitter.emit(new KeyValue(NullWritable.get(), structuredRecord));
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<NullWritable, StructuredRecord>>) emitter);
    }
}
