/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.api.persistent;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.immutable.List;

public class PersistentKafkaSource<OUT>
extends RichParallelSourceFunction<OUT>
implements ResultTypeQueryable<OUT>,
CheckpointCommitter,
CheckpointedAsynchronously<long[]> {
    private static final long serialVersionUID = 287845877188312621L;
    private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
    private final String topicName;
    private final DeserializationSchema<OUT> deserializationSchema;
    private final LinkedMap pendingCheckpoints = new LinkedMap();
    private transient ConsumerConfig consumerConfig;
    private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
    private transient ConsumerConnector consumer;
    private transient ZkClient zkClient;
    private transient long[] lastOffsets;
    private transient long[] commitedOffsets;
    private transient long[] restoreState;
    private volatile boolean running;

    public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
        Preconditions.checkNotNull(topicName);
        Preconditions.checkNotNull(deserializationSchema);
        Preconditions.checkNotNull(consumerConfig);
        this.topicName = topicName;
        this.deserializationSchema = deserializationSchema;
        this.consumerConfig = consumerConfig;
        if (consumerConfig.autoCommitEnable()) {
            throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. This source can only be used with auto commit disabled because the source is committing to zookeeper by itself (not using the KafkaConsumer).");
        }
        if (!consumerConfig.offsetsStorage().equals("zookeeper")) {
            throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
        }
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)this.consumerConfig);
        Map<String, Integer> topicCountMap = Collections.singletonMap(this.topicName, 1);
        Map streams = consumer.createMessageStreams(topicCountMap);
        if (streams.size() != 1) {
            throw new RuntimeException("Expected only one message stream but got " + streams.size());
        }
        java.util.List kafkaStreams = (java.util.List)streams.get(this.topicName);
        if (kafkaStreams == null) {
            throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString());
        }
        if (kafkaStreams.size() != 1) {
            throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams");
        }
        LOG.info("Opening Consumer instance for topic '{}' on group '{}'", (Object)this.topicName, (Object)this.consumerConfig.groupId());
        this.iteratorToRead = ((KafkaStream)kafkaStreams.get(0)).iterator();
        this.consumer = consumer;
        this.zkClient = new ZkClient(this.consumerConfig.zkConnect(), this.consumerConfig.zkSessionTimeoutMs(), this.consumerConfig.zkConnectionTimeoutMs(), (ZkSerializer)new KafkaZKStringSerializer());
        int numPartitions = this.getNumberOfPartitions();
        LOG.debug("The topic {} has {} partitions", (Object)this.topicName, (Object)numPartitions);
        this.lastOffsets = new long[numPartitions];
        this.commitedOffsets = new long[numPartitions];
        if (this.restoreState != null) {
            if (this.restoreState.length != numPartitions) {
                throw new IllegalStateException("There are " + this.restoreState.length + " offsets to restore for topic " + this.topicName + " but " + "there are only " + numPartitions + " in the topic");
            }
            LOG.info("Setting restored offsets {} in ZooKeeper", (Object)Arrays.toString(this.restoreState));
            this.setOffsetsInZooKeeper(this.restoreState);
            this.lastOffsets = this.restoreState;
        } else {
            Arrays.fill(this.lastOffsets, -1L);
        }
        Arrays.fill(this.commitedOffsets, 0L);
        this.pendingCheckpoints.clear();
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<OUT> ctx) throws Exception {
        if (this.iteratorToRead == null) {
            throw new IllegalStateException("Kafka iterator not initialized properly.");
        }
        Object checkpointLock = ctx.getCheckpointLock();
        while (this.running && this.iteratorToRead.hasNext()) {
            MessageAndMetadata message = this.iteratorToRead.next();
            if (this.lastOffsets[message.partition()] >= message.offset()) {
                LOG.info("Skipping message with offset {} from partition {}", (Object)message.offset(), (Object)message.partition());
                continue;
            }
            Object next = this.deserializationSchema.deserialize((byte[])message.message());
            if (this.deserializationSchema.isEndOfStream(next)) {
                LOG.info("DeserializationSchema signaled end of stream for this source");
                break;
            }
            Object object = checkpointLock;
            synchronized (object) {
                this.lastOffsets[message.partition()] = message.offset();
                ctx.collect(next);
            }
            if (!LOG.isTraceEnabled()) continue;
            LOG.trace("Processed record with offset {} from partition {}", (Object)message.offset(), (Object)message.partition());
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void close() {
        LOG.info("Closing Kafka consumer");
        this.consumer.shutdown();
        this.zkClient.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        if (this.lastOffsets == null) {
            LOG.warn("State snapshot requested on not yet opened source. Returning null");
            return null;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", new Object[]{Arrays.toString(this.lastOffsets), checkpointId, checkpointTimestamp});
        }
        long[] currentOffsets = Arrays.copyOf(this.lastOffsets, this.lastOffsets.length);
        LinkedMap linkedMap = this.pendingCheckpoints;
        synchronized (linkedMap) {
            this.pendingCheckpoints.put((Object)checkpointId, (Object)currentOffsets);
        }
        return currentOffsets;
    }

    public void restoreState(long[] state) {
        LOG.info("The state will be restored to {} in the open() method", (Object)Arrays.toString(state));
        this.restoreState = Arrays.copyOf(state, state.length);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitCheckpoint(long checkpointId) {
        long[] checkpointOffsets;
        LOG.info("Commit checkpoint {}", (Object)checkpointId);
        LinkedMap linkedMap = this.pendingCheckpoints;
        synchronized (linkedMap) {
            int posInMap = this.pendingCheckpoints.indexOf((Object)checkpointId);
            if (posInMap == -1) {
                LOG.warn("Unable to find pending checkpoint for id {}", (Object)checkpointId);
                return;
            }
            checkpointOffsets = (long[])this.pendingCheckpoints.remove(posInMap);
            if (!this.pendingCheckpoints.isEmpty()) {
                for (int i = 0; i < posInMap; ++i) {
                    this.pendingCheckpoints.remove(0);
                }
            }
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Committing offsets {} to ZooKeeper", (Object)Arrays.toString(checkpointOffsets));
        }
        this.setOffsetsInZooKeeper(checkpointOffsets);
    }

    private void setOffsetsInZooKeeper(long[] offsets) {
        for (int partition = 0; partition < offsets.length; ++partition) {
            long offset = offsets[partition];
            if (offset == -1L) continue;
            this.setOffset(partition, offset);
        }
    }

    private int getNumberOfPartitions() {
        List scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(this.topicName)).toList();
        scala.collection.mutable.Map list = ZkUtils.getPartitionsForTopics((ZkClient)this.zkClient, (Seq)scalaSeq);
        Option topicOption = list.get((Object)this.topicName);
        if (topicOption.isEmpty()) {
            throw new IllegalArgumentException("Unable to get number of partitions for topic " + this.topicName + " from " + list.toString());
        }
        Seq topic = (Seq)topicOption.get();
        return topic.size();
    }

    protected void setOffset(int partition, long offset) {
        if (this.commitedOffsets[partition] < offset) {
            PersistentKafkaSource.setOffset(this.zkClient, this.consumerConfig.groupId(), this.topicName, partition, offset);
            this.commitedOffsets[partition] = offset;
        } else {
            LOG.debug("Ignoring offset {} for partition {} because it is already committed", (Object)offset, (Object)partition);
        }
    }

    public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
        LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", new Object[]{partition, topic, groupId, offset});
        TopicAndPartition tap = new TopicAndPartition(topic, partition);
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
        ZkUtils.updatePersistentPath((ZkClient)zkClient, (String)(topicDirs.consumerOffsetDir() + "/" + tap.partition()), (String)Long.toString(offset));
    }

    public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
        TopicAndPartition tap = new TopicAndPartition(topic, partition);
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
        Tuple2 data = ZkUtils.readData((ZkClient)zkClient, (String)(topicDirs.consumerOffsetDir() + "/" + tap.partition()));
        return Long.valueOf((String)data._1());
    }

    private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
        out.defaultWriteObject();
        out.writeObject(this.consumerConfig.props().props());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        Properties props = (Properties)in.readObject();
        this.consumerConfig = new ConsumerConfig(props);
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public static class KafkaZKStringSerializer
    implements ZkSerializer {
        public byte[] serialize(Object data) throws ZkMarshallingError {
            try {
                return ((String)data).getBytes("UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }

        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            if (bytes == null) {
                return null;
            }
            try {
                return new String(bytes, "UTF-8");
            }
            catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

