/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.SerializableKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.internals.PartitionerWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSink<IN>
extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private Producer<IN, byte[]> producer;
    private Properties userDefinedProperties;
    private String topicId;
    private String brokerList;
    private SerializationSchema<IN, byte[]> schema;
    private SerializableKafkaPartitioner partitioner;
    private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;

    public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
        this(brokerList, topicId, new Properties(), serializationSchema);
    }

    public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
        String[] elements;
        for (String broker : elements = brokerList.split(",")) {
            NetUtils.getHostnamePort((String)broker);
        }
        Preconditions.checkNotNull(topicId, "TopicID not set");
        this.brokerList = brokerList;
        this.topicId = topicId;
        this.schema = serializationSchema;
        this.partitionerClass = null;
        this.userDefinedProperties = producerConfig;
    }

    public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
        this(brokerList, topicId, serializationSchema);
        ClosureCleaner.ensureSerializable((Object)partitioner);
        this.partitioner = partitioner;
    }

    public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
        this(brokerList, topicId, serializationSchema);
        this.partitionerClass = partitioner;
    }

    public void open(Configuration configuration) {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", this.brokerList);
        properties.put("request.required.acks", "-1");
        properties.put("message.send.max.retries", "10");
        properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
        properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
        for (Map.Entry<Object, Object> propertiesEntry : this.userDefinedProperties.entrySet()) {
            properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
        }
        if (this.partitioner != null) {
            properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
            properties.put("flink.kafka.wrapper.serialized", this.partitioner);
        }
        if (this.partitionerClass != null) {
            properties.put("partitioner.class", this.partitionerClass);
        }
        ProducerConfig config = new ProducerConfig(properties);
        try {
            this.producer = new Producer(config);
        }
        catch (NullPointerException e) {
            throw new RuntimeException("Cannot connect to Kafka broker " + this.brokerList, e);
        }
    }

    public void invoke(IN next) {
        byte[] serialized = (byte[])this.schema.serialize(next);
        this.producer.send(new KeyedMessage(this.topicId, null, next, (Object)serialized));
    }

    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}

