/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.stream.source.kafka.consumer;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.IStreamingConnector;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.source.IStreamingMessageParser;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.source.kafka.KafkaSource;

public class KafkaConnector
implements IStreamingConnector {
    private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private final String topic;
    private final IStreamingMessageParser parser;
    private ConsumerStartMode startMode = ConsumerStartMode.EARLIEST;
    private List<ConsumerRecord<byte[], byte[]>> buffer = Lists.newLinkedList();
    private List<Partition> partitions;
    private Map<Integer, Long> partitionOffsets;
    private KafkaSource kafkaSource;

    public KafkaConnector(Map<String, Object> conf, String topic, IStreamingMessageParser parser, KafkaSource kafkaSource) {
        this.kafkaConsumer = new KafkaConsumer(conf);
        this.topic = topic;
        this.parser = parser;
        this.kafkaSource = kafkaSource;
    }

    public void setStartPartition(List<Partition> partitions, ConsumerStartMode startMode, Map<Integer, Long> partitionOffsets) {
        this.partitions = partitions;
        this.startMode = startMode;
        this.partitionOffsets = partitionOffsets;
    }

    public List<Partition> getConsumePartitions() {
        return this.partitions;
    }

    public void open() {
        if (this.partitions == null || this.partitions.size() <= 0) {
            throw new IllegalStateException("not assign partitions");
        }
        ArrayList topicPartitions = Lists.newArrayList();
        for (Partition partition : this.partitions) {
            topicPartitions.add(new TopicPartition(this.topic, partition.getPartitionId()));
        }
        this.kafkaConsumer.assign((Collection)topicPartitions);
        if (this.startMode == ConsumerStartMode.EARLIEST) {
            this.kafkaConsumer.seekToBeginning((Collection)topicPartitions);
        } else if (this.startMode == ConsumerStartMode.LATEST) {
            this.kafkaConsumer.seekToEnd((Collection)topicPartitions);
        } else {
            ArrayList newTopicPartitions = Lists.newArrayList();
            for (TopicPartition topicPartition : topicPartitions) {
                Long offset = this.partitionOffsets.get(topicPartition.partition());
                if (offset != null) {
                    this.kafkaConsumer.seek(topicPartition, offset.longValue());
                    continue;
                }
                newTopicPartitions.add(topicPartition);
            }
            this.kafkaConsumer.seekToBeginning((Collection)newTopicPartitions);
        }
    }

    public void close() {
        this.kafkaConsumer.close();
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    public StreamingMessage nextEvent() {
        if (this.buffer.isEmpty()) {
            this.fillBuffer();
        }
        if (this.buffer.isEmpty()) {
            return null;
        }
        ConsumerRecord<byte[], byte[]> record = this.buffer.remove(0);
        return this.parser.parse(record);
    }

    private void fillBuffer() {
        ConsumerRecords records = this.kafkaConsumer.poll(100L);
        LinkedList newBuffer = Lists.newLinkedList();
        for (TopicPartition topicPartition : records.partitions()) {
            newBuffer.addAll(records.records(topicPartition));
        }
        this.buffer = newBuffer;
    }

    public IStreamingSource getSource() {
        return this.kafkaSource;
    }
}

