package io.cdap.plugin.gcp.publisher;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.format.avro.StructuredToAvroTransformer;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.publisher.GooglePublisher;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:io/cdap/plugin/gcp/publisher/PubSubOutputFormat.class */
public class PubSubOutputFormat extends OutputFormat<NullWritable, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubOutputFormat.class);
    private static final String SERVICE_ACCOUNT = "service.account";
    private static final String SERVICE_ACCOUNT_TYPE = "service.account.type";
    private static final String SERVICE_ACCOUNT_TYPE_JSON = "json";
    private static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath";
    private static final String PROJECT = "project";
    private static final String TOPIC = "topic";
    private static final String COUNT_BATCH_SIZE = "message.count.batch.size";
    private static final String REQUEST_BYTES_THRESHOLD = "request.bytes.threshold";
    private static final String DELAY_THRESHOLD = "delay.threshold";
    private static final String ERROR_THRESHOLD = "error.threshold";
    private static final String RETRY_TIMEOUT_SECONDS = "retry.timeout";

    /* loaded from: input_file:io/cdap/plugin/gcp/publisher/PubSubOutputFormat$PubSubRecordWriter.class */
    public class PubSubRecordWriter extends RecordWriter<NullWritable, StructuredRecord> {
        private final Publisher publisher;
        private final long errorThreshold;
        private final String format;
        private final String delimiter;
        private final AtomicReference<Throwable> error = new AtomicReference<>();
        private final AtomicLong failures = new AtomicLong(0);
        private final Set<ApiFuture> futures = ConcurrentHashMap.newKeySet();

        public PubSubRecordWriter(Publisher publisher, String str, String str2, long j) {
            this.publisher = publisher;
            this.errorThreshold = j;
            this.format = str;
            this.delimiter = str2;
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void write(NullWritable nullWritable, StructuredRecord structuredRecord) throws IOException {
            handleErrorIfAny();
            final ApiFuture<String> publish = this.publisher.publish(getPubSubMessage(structuredRecord));
            this.futures.add(publish);
            ApiFutures.addCallback(publish, new ApiFutureCallback<String>() { // from class: io.cdap.plugin.gcp.publisher.PubSubOutputFormat.PubSubRecordWriter.1
                @Override // com.google.api.core.ApiFutureCallback
                public void onFailure(Throwable th) {
                    PubSubRecordWriter.this.error.set(th);
                    PubSubRecordWriter.this.failures.incrementAndGet();
                    PubSubRecordWriter.this.futures.remove(publish);
                }

                @Override // com.google.api.core.ApiFutureCallback
                public void onSuccess(String str) {
                    PubSubRecordWriter.this.futures.remove(publish);
                }
            });
        }

        private PubsubMessage getPubSubMessage(StructuredRecord structuredRecord) throws IOException {
            PubsubMessage pubsubMessage = null;
            String str = this.format;
            boolean z = -1;
            switch (str.hashCode()) {
                case -793011724:
                    if (str.equals(PubSubConstants.PARQUET)) {
                        z = true;
                        break;
                    }
                    break;
                case -250518023:
                    if (str.equals(PubSubConstants.DELIMITED)) {
                        z = 6;
                        break;
                    }
                    break;
                case 98822:
                    if (str.equals(PubSubConstants.CSV)) {
                        z = 5;
                        break;
                    }
                    break;
                case 115159:
                    if (str.equals(PubSubConstants.TSV)) {
                        z = 7;
                        break;
                    }
                    break;
                case 3006770:
                    if (str.equals("avro")) {
                        z = false;
                        break;
                    }
                    break;
                case 3026845:
                    if (str.equals(PubSubConstants.BLOB)) {
                        z = 3;
                        break;
                    }
                    break;
                case 3271912:
                    if (str.equals("json")) {
                        z = 4;
                        break;
                    }
                    break;
                case 3556653:
                    if (str.equals("text")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                case true:
                    GenericRecord transform = new StructuredToAvroTransformer(structuredRecord.getSchema()).transform(structuredRecord);
                    GenericDatumWriter genericDatumWriter = new GenericDatumWriter(new Schema.Parser().parse(String.valueOf(structuredRecord.getSchema())));
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                    genericDatumWriter.write(transform, binaryEncoder);
                    binaryEncoder.flush();
                    byteArrayOutputStream.close();
                    pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).build();
                    break;
                case true:
                case true:
                case true:
                    pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(StructuredRecordStringConverter.toJsonString(structuredRecord))).build();
                    break;
                case true:
                    pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(StructuredRecordStringConverter.toDelimitedString(structuredRecord, ","))).build();
                    break;
                case true:
                    pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(StructuredRecordStringConverter.toDelimitedString(structuredRecord, this.delimiter))).build();
                    break;
                case true:
                    pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(StructuredRecordStringConverter.toDelimitedString(structuredRecord, "\t"))).build();
                    break;
            }
            return pubsubMessage;
        }

        private void handleErrorIfAny() throws IOException {
            if (this.failures.get() > this.errorThreshold) {
                throw new IOException(String.format("Failed to publish %s records", Long.valueOf(this.failures.get())), this.error.get());
            }
        }

        @Override // org.apache.hadoop.mapreduce.RecordWriter
        public void close(TaskAttemptContext taskAttemptContext) throws IOException {
            try {
                try {
                    this.publisher.publishAllOutstanding();
                    Iterator<ApiFuture> it = this.futures.iterator();
                    while (it.hasNext()) {
                        it.next().get();
                        handleErrorIfAny();
                    }
                } finally {
                    try {
                        this.publisher.shutdown();
                    } catch (Exception e) {
                        PubSubOutputFormat.LOG.debug("Exception while shutting down publisher ", e);
                    }
                }
            } catch (InterruptedException | ExecutionException e2) {
                throw new IOException("Error publishing records to PubSub", e2);
            }
        }

        public String getFormat() {
            return this.format;
        }

        public String getDelimiter() {
            return this.delimiter;
        }
    }

    public static void configure(Configuration configuration, GooglePublisher.Config config) {
        String serviceAccount = config.getServiceAccount();
        String format = config.getFormat();
        String delimiter = config.getDelimiter();
        if (serviceAccount != null) {
            configuration.set("service.account.type", config.getServiceAccountType());
            configuration.set(SERVICE_ACCOUNT, config.getServiceAccount());
        }
        configuration.set("project", config.getProject());
        configuration.set(TOPIC, config.getTopic());
        configuration.set(COUNT_BATCH_SIZE, String.valueOf(config.getMessageCountBatchSize()));
        configuration.set(REQUEST_BYTES_THRESHOLD, String.valueOf(config.getRequestBytesThreshold()));
        configuration.set(DELAY_THRESHOLD, String.valueOf(config.getPublishDelayThresholdMillis()));
        configuration.set(ERROR_THRESHOLD, String.valueOf(config.getErrorThreshold()));
        configuration.set(RETRY_TIMEOUT_SECONDS, String.valueOf(config.getRetryTimeoutSeconds()));
        configuration.set("format", format);
        if (delimiter != null) {
            configuration.set("delimiter", config.getDelimiter());
        }
    }

    @Override // org.apache.hadoop.mapreduce.OutputFormat
    public RecordWriter<NullWritable, StructuredRecord> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        Configuration configuration = taskAttemptContext.getConfiguration();
        String str = configuration.get(SERVICE_ACCOUNT);
        boolean equals = "filePath".equals(configuration.get("service.account.type"));
        String str2 = configuration.get("project");
        String str3 = configuration.get(TOPIC);
        String str4 = configuration.get("format");
        String str5 = configuration.get("delimiter");
        long parseLong = Long.parseLong(configuration.get(COUNT_BATCH_SIZE));
        long parseLong2 = Long.parseLong(configuration.get(REQUEST_BYTES_THRESHOLD));
        long parseLong3 = Long.parseLong(configuration.get(DELAY_THRESHOLD));
        long parseLong4 = Long.parseLong(configuration.get(ERROR_THRESHOLD));
        Publisher.Builder retrySettings = Publisher.newBuilder(ProjectTopicName.of(str2, str3)).setBatchingSettings(getBatchingSettings(parseLong, parseLong2, parseLong3)).setRetrySettings(getRetrySettings(Integer.parseInt(configuration.get(RETRY_TIMEOUT_SECONDS))));
        if (str != null) {
            retrySettings.setCredentialsProvider(() -> {
                return GCPUtils.loadServiceAccountCredentials(str, equals);
            });
        }
        return new PubSubRecordWriter(retrySettings.build(), str4, str5, parseLong4);
    }

    private RetrySettings getRetrySettings(int i) {
        return RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(100L)).setRetryDelayMultiplier(2.0d).setMaxRetryDelay(Duration.ofSeconds(2L)).setTotalTimeout(Duration.ofSeconds(i)).setInitialRpcTimeout(Duration.ofSeconds(1L)).setMaxRpcTimeout(Duration.ofSeconds(5L)).build();
    }

    private BatchingSettings getBatchingSettings(long j, long j2, long j3) {
        return BatchingSettings.newBuilder().setElementCountThreshold(Long.valueOf(j)).setRequestByteThreshold(Long.valueOf(j2)).setDelayThreshold(Duration.ofMillis(j3)).build();
    }

    @Override // org.apache.hadoop.mapreduce.OutputFormat
    public void checkOutputSpecs(JobContext jobContext) {
    }

    @Override // org.apache.hadoop.mapreduce.OutputFormat
    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
        return new OutputCommitter() { // from class: io.cdap.plugin.gcp.publisher.PubSubOutputFormat.1
            @Override // org.apache.hadoop.mapreduce.OutputCommitter
            public void setupJob(JobContext jobContext) {
            }

            @Override // org.apache.hadoop.mapreduce.OutputCommitter
            public void setupTask(TaskAttemptContext taskAttemptContext2) {
            }

            @Override // org.apache.hadoop.mapreduce.OutputCommitter
            public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext2) {
                return false;
            }

            @Override // org.apache.hadoop.mapreduce.OutputCommitter
            public void commitTask(TaskAttemptContext taskAttemptContext2) {
            }

            @Override // org.apache.hadoop.mapreduce.OutputCommitter
            public void abortTask(TaskAttemptContext taskAttemptContext2) {
            }
        };
    }
}
