/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.List;
import java.util.Properties;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafkaProducer<IN>
extends RichSinkFunction<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
    private static final long serialVersionUID = 1L;
    private final int[] partitions;
    private final Properties producerConfig;
    private final String topicId;
    private final SerializationSchema<IN, byte[]> schema;
    private final KafkaPartitioner partitioner;
    private boolean logFailuresOnly;
    private transient KafkaProducer<byte[], byte[]> producer;
    private transient Callback callback;
    private volatile transient Exception asyncException;

    public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
        this(topicId, serializationSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerList), null);
    }

    public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
        this(topicId, serializationSchema, producerConfig, null);
    }

    public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
        Preconditions.checkNotNull(topicId, "TopicID not set");
        Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
        Preconditions.checkNotNull(producerConfig, "producerConfig not set");
        ClosureCleaner.ensureSerializable((Object)customPartitioner);
        ClosureCleaner.ensureSerializable(serializationSchema);
        this.topicId = topicId;
        this.schema = serializationSchema;
        this.producerConfig = producerConfig;
        if (!producerConfig.contains("key.serializer")) {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"key.serializer");
        }
        if (!producerConfig.contains("value.serializer")) {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"value.serializer");
        }
        try (KafkaProducer getPartitionsProd = new KafkaProducer(this.producerConfig);){
            List partitionsList = getPartitionsProd.partitionsFor(topicId);
            this.partitions = new int[partitionsList.size()];
            for (int i = 0; i < this.partitions.length; ++i) {
                this.partitions[i] = ((PartitionInfo)partitionsList.get(i)).partition();
            }
            getPartitionsProd.close();
        }
        this.partitioner = customPartitioner == null ? new FixedPartitioner() : customPartitioner;
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        this.logFailuresOnly = logFailuresOnly;
    }

    public void open(Configuration configuration) {
        this.producer = new KafkaProducer(this.producerConfig);
        RuntimeContext ctx = this.getRuntimeContext();
        this.partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), this.partitions);
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", new Object[]{ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), this.topicId});
        this.callback = this.logFailuresOnly ? new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    LOG.error("Error while sending record to Kafka: " + e.getMessage(), (Throwable)e);
                }
            }
        } : new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null && FlinkKafkaProducer.this.asyncException == null) {
                    FlinkKafkaProducer.this.asyncException = exception;
                }
            }
        };
    }

    public void invoke(IN next) throws Exception {
        this.checkErroneous();
        byte[] serialized = (byte[])this.schema.serialize(next);
        ProducerRecord record = new ProducerRecord(this.topicId, Integer.valueOf(this.partitioner.partition(next, this.partitions.length)), null, (Object)serialized);
        this.producer.send(record, this.callback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        this.checkErroneous();
    }

    private void checkErroneous() throws Exception {
        Exception e = this.asyncException;
        if (e != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
        }
    }

    public static Properties getPropertiesFromBrokerList(String brokerList) {
        String[] elements;
        for (String broker : elements = brokerList.split(",")) {
            NetUtils.getCorrectHostnamePort((String)broker);
        }
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokerList);
        return props;
    }
}

