/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.api;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.source.ConnectorSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSource<OUT>
extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private static final String DEFAULT_GROUP_ID = "flink-group";
    private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200L;
    private final String zookeeperAddress;
    private final String groupId;
    private final String topicId;
    private final Properties customProperties;
    private final long zookeeperSyncTimeMillis;
    private transient ConsumerConnector consumer;
    private transient ConsumerIterator<byte[], byte[]> consumerIterator;
    private volatile boolean isRunning;

    public KafkaSource(String zookeeperAddress, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
        this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
    }

    public KafkaSource(String zookeeperAddress, String topicId, String groupId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis, Properties customProperties) {
        super(deserializationSchema);
        Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
        Preconditions.checkNotNull(topicId, "Topic ID is null");
        Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
        Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0L, "The ZK sync time must be positive");
        this.zookeeperAddress = zookeeperAddress;
        this.groupId = groupId;
        this.topicId = topicId;
        this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
        this.customProperties = customProperties;
    }

    public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
        this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
    }

    public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
        this(zookeeperAddress, topicId, deserializationSchema, 200L);
    }

    private void initializeConnection() {
        Properties props = new Properties();
        props.put("zookeeper.connect", this.zookeeperAddress);
        props.put("group.id", this.groupId);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", Long.toString(this.zookeeperSyncTimeMillis));
        props.put("auto.commit.interval.ms", "1000");
        if (this.customProperties != null) {
            for (Map.Entry<Object, Object> e : props.entrySet()) {
                if (props.contains(e.getKey())) {
                    LOG.warn("Overwriting property " + e.getKey() + " with value " + e.getValue());
                }
                props.put(e.getKey(), e.getValue());
            }
        }
        this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(props));
        Map consumerMap = this.consumer.createMessageStreams(Collections.singletonMap(this.topicId, 1));
        List streams = (List)consumerMap.get(this.topicId);
        KafkaStream stream = (KafkaStream)streams.get(0);
        this.consumer.commitOffsets();
        this.consumerIterator = stream.iterator();
    }

    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        try {
            while (this.isRunning && this.consumerIterator.hasNext()) {
                Object out = this.schema.deserialize((byte[])this.consumerIterator.next().message());
                if (this.schema.isEndOfStream(out)) {
                    break;
                }
                ctx.collect(out);
            }
        }
        finally {
            this.consumer.shutdown();
        }
    }

    public void open(Configuration config) throws Exception {
        this.initializeConnection();
        this.isRunning = true;
    }

    public void cancel() {
        this.isRunning = false;
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }
}

