/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContextImpl;
import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage;
import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
import org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener;
import org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarWriter<IN>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, PulsarCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
    private final PulsarSerializationSchema<IN> serializationSchema;
    private final MetadataListener metadataListener;
    private final TopicRouter<IN> topicRouter;
    private final MessageDelayer<IN> messageDelayer;
    private final DeliveryGuarantee deliveryGuarantee;
    private final PulsarSinkContext sinkContext;
    private final ProducerRegister producerRegister;
    private final MailboxExecutor mailboxExecutor;
    private final AtomicLong pendingMessages;

    public PulsarWriter(SinkConfiguration sinkConfiguration, PulsarSerializationSchema<IN> serializationSchema, MetadataListener metadataListener, TopicRouter<IN> topicRouter, MessageDelayer<IN> messageDelayer, PulsarCrypto pulsarCrypto, Sink.InitContext initContext) throws PulsarClientException {
        Preconditions.checkNotNull((Object)((Object)sinkConfiguration));
        this.serializationSchema = (PulsarSerializationSchema)Preconditions.checkNotNull(serializationSchema);
        this.metadataListener = (MetadataListener)Preconditions.checkNotNull((Object)metadataListener);
        this.topicRouter = (TopicRouter)Preconditions.checkNotNull(topicRouter);
        this.messageDelayer = (MessageDelayer)Preconditions.checkNotNull(messageDelayer);
        Preconditions.checkNotNull((Object)initContext);
        this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
        this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration, metadataListener);
        LOG.debug("Initialize topic metadata after creating Pulsar writer.");
        ProcessingTimeService timeService = initContext.getProcessingTimeService();
        this.metadataListener.open(sinkConfiguration, timeService);
        this.topicRouter.open(sinkConfiguration);
        try {
            SerializationSchema.InitializationContext initializationContext = initContext.asSerializationSchemaInitializationContext();
            this.serializationSchema.open(initializationContext, this.sinkContext, sinkConfiguration);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", (Throwable)e);
        }
        SinkWriterMetricGroup metricGroup = initContext.metricGroup();
        this.producerRegister = new ProducerRegister(sinkConfiguration, pulsarCrypto, metricGroup);
        this.mailboxExecutor = initContext.getMailboxExecutor();
        this.pendingMessages = new AtomicLong(0L);
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        PulsarMessage<?> message = this.serializationSchema.serialize(element, this.sinkContext);
        String key = message.getKey();
        List<TopicPartition> partitions = this.metadataListener.availablePartitions();
        TopicPartition partition = this.topicRouter.route(element, key, partitions, this.sinkContext);
        String topic = partition.getFullTopicName();
        TypedMessageBuilder<?> builder = this.createMessageBuilder(topic, context, message);
        long deliverAt = this.messageDelayer.deliverAt(element, this.sinkContext);
        if (deliverAt > 0L) {
            builder.deliverAt(deliverAt);
        }
        if (this.deliveryGuarantee == DeliveryGuarantee.NONE) {
            builder.sendAsync();
        } else {
            this.pendingMessages.incrementAndGet();
            CompletableFuture future = builder.sendAsync();
            future.whenComplete((id, ex) -> {
                this.pendingMessages.decrementAndGet();
                if (ex != null) {
                    this.mailboxExecutor.execute(() -> this.throwSendingException(topic, (Throwable)ex), "Failed to send data to Pulsar");
                } else {
                    LOG.debug("Sent message to Pulsar {} with message id {}", (Object)topic, id);
                }
            });
        }
    }

    private void throwSendingException(String topic, Throwable ex) {
        throw new FlinkRuntimeException("Failed to send data to Pulsar: " + topic, ex);
    }

    private TypedMessageBuilder<?> createMessageBuilder(String topic, SinkWriter.Context context, PulsarMessage<?> message) throws PulsarClientException {
        List<String> clusters;
        Long sequenceId;
        long eventTime;
        String key;
        Schema<?> schema = message.getSchema();
        TypedMessageBuilder builder = this.producerRegister.createMessageBuilder(topic, schema);
        byte[] orderingKey = message.getOrderingKey();
        if (orderingKey != null && orderingKey.length > 0) {
            builder.orderingKey(orderingKey);
        }
        if (!Strings.isNullOrEmpty((String)(key = message.getKey()))) {
            builder.key(key);
        }
        if (message.isBase64EncodedKey()) {
            ((TypedMessageBuilderImpl)builder).getMetadataBuilder().setPartitionKeyB64Encoded(true);
        }
        if ((eventTime = message.getEventTime()) > 0L) {
            builder.eventTime(eventTime);
        } else {
            Long timestamp = context.timestamp();
            if (timestamp != null && timestamp > 0L) {
                builder.eventTime(timestamp.longValue());
            }
        }
        Object value = message.getValue();
        if (value == null) {
            LOG.warn("Send a message with empty payloads, this is a tombstone message in Pulsar.");
        }
        builder.value(value);
        Map<String, String> properties = message.getProperties();
        if (properties != null && !properties.isEmpty()) {
            builder.properties(properties);
        }
        if ((sequenceId = message.getSequenceId()) != null) {
            builder.sequenceId(sequenceId.longValue());
        }
        if ((clusters = message.getReplicationClusters()) != null && !clusters.isEmpty()) {
            builder.replicationClusters(clusters);
        }
        if (message.isDisableReplication()) {
            builder.disableReplication();
        }
        return builder;
    }

    public void flush(boolean endOfInput) throws IOException {
        if (endOfInput || this.deliveryGuarantee != DeliveryGuarantee.NONE) {
            LOG.info("Flush the pending messages to Pulsar.");
            this.producerRegister.flush();
            while (this.pendingMessages.longValue() > 0L) {
                this.producerRegister.flush();
            }
        }
    }

    public Collection<PulsarCommittable> prepareCommit() {
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            return this.producerRegister.prepareCommit();
        }
        return Collections.emptyList();
    }

    public void close() throws Exception {
        IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.metadataListener, this.producerRegister});
    }
}

