/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafkaConsumer<T>
extends RichParallelSourceFunction<T>
implements CheckpointNotifier,
CheckpointedAsynchronously<long[]>,
ResultTypeQueryable<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
    public static final long OFFSET_NOT_SET = -915623761776L;
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry";
    public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3;
    private final OffsetStore offsetStore;
    private final FetcherType fetcherType;
    private final String topic;
    private final Properties props;
    private final int[] partitions;
    private final DeserializationSchema<T> valueDeserializer;
    private final LinkedMap pendingCheckpoints = new LinkedMap();
    private transient Fetcher fetcher;
    private transient OffsetHandler offsetHandler;
    private transient List<TopicPartition> subscribedPartitions;
    private transient long[] lastOffsets;
    private transient long[] commitedOffsets;
    private transient long[] restoreToOffset;
    private volatile boolean running = true;

    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props, OffsetStore offsetStore, FetcherType fetcherType) {
        this.offsetStore = Preconditions.checkNotNull(offsetStore);
        this.fetcherType = Preconditions.checkNotNull(fetcherType);
        if (fetcherType == FetcherType.NEW_HIGH_LEVEL) {
            throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 / 0.9.0 is not yet supported in Flink");
        }
        if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
            throw new IllegalArgumentException("The Kafka offset handler cannot be used together with the old low-level fetcher.");
        }
        this.topic = Preconditions.checkNotNull(topic, "topic");
        this.props = Preconditions.checkNotNull(props, "props");
        this.valueDeserializer = Preconditions.checkNotNull(valueDeserializer, "valueDeserializer");
        if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
            FlinkKafkaConsumer.validateZooKeeperConfig(props);
        }
        List<PartitionInfo> partitionInfos = FlinkKafkaConsumer.getPartitionsForTopic(topic, props);
        this.partitions = new int[partitionInfos.size()];
        for (int i = 0; i < partitionInfos.size(); ++i) {
            this.partitions[i] = partitionInfos.get(i).partition();
            if (this.partitions[i] < this.partitions.length) continue;
            throw new RuntimeException("Kafka partition numbers are sparse");
        }
        LOG.info("Topic {} has {} partitions", (Object)topic, (Object)this.partitions.length);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        int numConsumers = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int thisComsumerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.subscribedPartitions = FlinkKafkaConsumer.assignPartitions(this.partitions, this.topic, numConsumers, thisComsumerIndex);
        if (LOG.isInfoEnabled()) {
            LOG.info("Kafka consumer {} will read partitions {} out of partitions {}", new Object[]{thisComsumerIndex, this.subscribedPartitions, Arrays.toString(this.partitions)});
        }
        if (this.subscribedPartitions.isEmpty()) {
            LOG.info("Kafka consumer {} has no partitions (empty source)", (Object)thisComsumerIndex);
            return;
        }
        switch (this.fetcherType) {
            case NEW_HIGH_LEVEL: {
                throw new UnsupportedOperationException("Currently unsupported");
            }
            case LEGACY_LOW_LEVEL: {
                this.fetcher = new LegacyFetcher(this.topic, this.props, this.getRuntimeContext().getTaskName());
                break;
            }
            default: {
                throw new RuntimeException("Requested unknown fetcher " + this.fetcher);
            }
        }
        this.fetcher.setPartitionsToRead(this.subscribedPartitions);
        switch (this.offsetStore) {
            case FLINK_ZOOKEEPER: {
                this.offsetHandler = new ZookeeperOffsetHandler(this.props);
                break;
            }
            case KAFKA: {
                throw new Exception("Kafka offset handler cannot work with legacy fetcher");
            }
            default: {
                throw new RuntimeException("Requested unknown offset store " + (Object)((Object)this.offsetStore));
            }
        }
        this.lastOffsets = new long[this.partitions.length];
        this.commitedOffsets = new long[this.partitions.length];
        Arrays.fill(this.lastOffsets, -915623761776L);
        Arrays.fill(this.commitedOffsets, -915623761776L);
        if (this.restoreToOffset != null) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Consumer {} found offsets from previous checkpoint: {}", (Object)thisComsumerIndex, (Object)Arrays.toString(this.restoreToOffset));
            }
            for (int i = 0; i < this.restoreToOffset.length; ++i) {
                long restoredOffset = this.restoreToOffset[i];
                if (restoredOffset == -915623761776L) continue;
                this.fetcher.seek(new TopicPartition(this.topic, i), restoredOffset + 1L);
                this.lastOffsets[i] = restoredOffset;
            }
        } else {
            this.offsetHandler.seekFetcherToInitialOffsets(this.subscribedPartitions, this.fetcher);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.fetcher != null) {
            PeriodicOffsetCommitter offsetCommitter = null;
            StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
            if (!streamingRuntimeContext.isCheckpointingEnabled()) {
                long commitInterval = Long.valueOf(this.props.getProperty("auto.commit.interval.ms", "60000"));
                offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
                offsetCommitter.start();
                LOG.info("Starting periodic offset committer, with commit interval of {}ms", (Object)commitInterval);
            }
            this.fetcher.run(sourceContext, this.valueDeserializer, this.lastOffsets);
            if (offsetCommitter != null) {
                offsetCommitter.close();
            }
        } else {
            Object waitLock = new Object();
            while (this.running) {
                try {
                    Object object = waitLock;
                    synchronized (object) {
                        waitLock.wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }
        sourceContext.close();
    }

    public void cancel() {
        this.running = false;
        Fetcher fetcher = this.fetcher;
        this.fetcher = null;
        if (fetcher != null) {
            try {
                fetcher.close();
            }
            catch (IOException e) {
                LOG.warn("Error while closing Kafka connector data fetcher", (Throwable)e);
            }
        }
        OffsetHandler offsetHandler = this.offsetHandler;
        this.offsetHandler = null;
        if (offsetHandler != null) {
            try {
                offsetHandler.close();
            }
            catch (IOException e) {
                LOG.warn("Error while closing Kafka connector offset handler", (Throwable)e);
            }
        }
    }

    public void close() throws Exception {
        this.cancel();
        super.close();
    }

    public TypeInformation<T> getProducedType() {
        return this.valueDeserializer.getProducedType();
    }

    public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        if (this.lastOffsets == null) {
            LOG.debug("snapshotState() requested on not yet opened source; returning null.");
            return null;
        }
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", new Object[]{Arrays.toString(this.lastOffsets), checkpointId, checkpointTimestamp});
        }
        long[] currentOffsets = Arrays.copyOf(this.lastOffsets, this.lastOffsets.length);
        this.pendingCheckpoints.put((Object)checkpointId, (Object)currentOffsets);
        while (this.pendingCheckpoints.size() > 100) {
            this.pendingCheckpoints.remove(0);
        }
        return currentOffsets;
    }

    public void restoreState(long[] restoredOffsets) {
        this.restoreToOffset = restoredOffsets;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        block13: {
            if (this.fetcher == null) {
                LOG.debug("notifyCheckpointComplete() called on uninitialized source");
                return;
            }
            if (!this.running) {
                LOG.debug("notifyCheckpointComplete() called on closed source");
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing offsets externally for checkpoint {}", (Object)checkpointId);
            }
            try {
                long[] checkpointOffsets;
                LinkedMap linkedMap = this.pendingCheckpoints;
                synchronized (linkedMap) {
                    int posInMap = this.pendingCheckpoints.indexOf((Object)checkpointId);
                    if (posInMap == -1) {
                        LOG.warn("Received confirmation for unknown checkpoint id {}", (Object)checkpointId);
                        return;
                    }
                    checkpointOffsets = (long[])this.pendingCheckpoints.remove(posInMap);
                    for (int i = 0; i < posInMap; ++i) {
                        this.pendingCheckpoints.remove(0);
                    }
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("Committing offsets {} to offset store: {}", (Object)Arrays.toString(checkpointOffsets), (Object)this.offsetStore);
                }
                HashMap<TopicPartition, Long> offsetsToCommit = new HashMap<TopicPartition, Long>();
                for (TopicPartition tp : this.subscribedPartitions) {
                    int partition = tp.partition();
                    long offset = checkpointOffsets[partition];
                    long lastCommitted = this.commitedOffsets[partition];
                    if (offset == -915623761776L) continue;
                    if (offset > lastCommitted) {
                        offsetsToCommit.put(tp, offset);
                        LOG.debug("Committing offset {} for partition {}", (Object)offset, (Object)partition);
                        continue;
                    }
                    LOG.debug("Ignoring offset {} for partition {} because it is already committed", (Object)offset, (Object)partition);
                }
                this.offsetHandler.commit(offsetsToCommit);
            }
            catch (Exception e) {
                if (!this.running) break block13;
                throw e;
            }
        }
    }

    protected static List<TopicPartition> assignPartitions(int[] partitions, String topicName, int numConsumers, int consumerIndex) {
        Preconditions.checkArgument(numConsumers > 0);
        Preconditions.checkArgument(consumerIndex < numConsumers);
        ArrayList<TopicPartition> partitionsToSub = new ArrayList<TopicPartition>();
        for (int i = 0; i < partitions.length; ++i) {
            if (i % numConsumers != consumerIndex) continue;
            partitionsToSub.add(new TopicPartition(topicName, partitions[i]));
        }
        return partitionsToSub;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<PartitionInfo> getPartitionsForTopic(String topic, Properties properties) {
        String seedBrokersConfString = properties.getProperty("bootstrap.servers");
        int numRetries = Integer.valueOf(properties.getProperty(GET_PARTITIONS_RETRIES_KEY, Integer.toString(3)));
        Preconditions.checkNotNull(seedBrokersConfString, "Configuration property bootstrap.servers not set");
        String[] seedBrokers = seedBrokersConfString.split(",");
        ArrayList<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
        Random rnd = new Random();
        block7: for (int retry = 0; retry < numRetries; ++retry) {
            int index = rnd.nextInt(seedBrokers.length);
            block8: for (int arrIdx = 0; arrIdx < seedBrokers.length; ++arrIdx) {
                String seedBroker = seedBrokers[index];
                LOG.info("Trying to get topic metadata from broker {} in try {}/{}", new Object[]{seedBroker, retry, numRetries});
                if (++index == seedBrokers.length) {
                    index = 0;
                }
                URL brokerUrl = NetUtils.getCorrectHostnamePort((String)seedBroker);
                try (SimpleConsumer consumer = null;){
                    String clientId = "flink-kafka-consumer-partition-lookup";
                    int soTimeout = Integer.valueOf(properties.getProperty("socket.timeout.ms", "30000"));
                    int bufferSize = Integer.valueOf(properties.getProperty("socket.receive.buffer.bytes", "65536"));
                    consumer = new SimpleConsumer(brokerUrl.getHost(), brokerUrl.getPort(), soTimeout, bufferSize, "flink-kafka-consumer-partition-lookup");
                    List<String> topics = Collections.singletonList(topic);
                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    TopicMetadataResponse resp = consumer.send(req);
                    List metaData = resp.topicsMetadata();
                    partitions.clear();
                    for (TopicMetadata item : metaData) {
                        if (item.errorCode() != ErrorMapping.NoError()) {
                            if (item.errorCode() == ErrorMapping.InvalidTopicCode() || item.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
                                throw new RuntimeException("Requested partitions for unknown topic", ErrorMapping.exceptionFor((short)item.errorCode()));
                            }
                            LOG.warn("Error while getting metadata from broker " + seedBroker + " to find partitions for " + topic, ErrorMapping.exceptionFor((short)item.errorCode()));
                            continue block8;
                        }
                        if (!item.topic().equals(topic)) {
                            LOG.warn("Received metadata from topic " + item.topic() + " even though it was not requested. Skipping ...");
                            continue block8;
                        }
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            Node leader = FlinkKafkaConsumer.brokerToNode(part.leader());
                            Node[] replicas = new Node[part.replicas().size()];
                            for (int i = 0; i < part.replicas().size(); ++i) {
                                replicas[i] = FlinkKafkaConsumer.brokerToNode((Broker)part.replicas().get(i));
                            }
                            Node[] ISRs = new Node[part.isr().size()];
                            for (int i = 0; i < part.isr().size(); ++i) {
                                ISRs[i] = FlinkKafkaConsumer.brokerToNode((Broker)part.isr().get(i));
                            }
                            PartitionInfo pInfo = new PartitionInfo(topic, part.partitionId(), leader, replicas, ISRs);
                            partitions.add(pInfo);
                        }
                    }
                    break block7;
                }
            }
        }
        return partitions;
    }

    private static Node brokerToNode(Broker broker) {
        return new Node(broker.id(), broker.host(), broker.port());
    }

    protected static void validateZooKeeperConfig(Properties props) {
        if (props.getProperty("zookeeper.connect") == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
        }
        if (props.getProperty("group.id") == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer");
        }
        try {
            Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0"));
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer");
        }
    }

    private static class PeriodicOffsetCommitter
    extends Thread {
        private final long commitInterval;
        private final FlinkKafkaConsumer consumer;
        private volatile boolean running = true;

        public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
            this.commitInterval = commitInterval;
            this.consumer = consumer;
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (this.running) {
                    try {
                        Thread.sleep(this.commitInterval);
                        long[] currentOffsets = Arrays.copyOf(this.consumer.lastOffsets, this.consumer.lastOffsets.length);
                        HashMap<TopicPartition, Long> offsetsToCommit = new HashMap<TopicPartition, Long>();
                        for (TopicPartition tp : this.consumer.subscribedPartitions) {
                            int partition = tp.partition();
                            long offset = currentOffsets[partition];
                            long lastCommitted = this.consumer.commitedOffsets[partition];
                            if (offset == -915623761776L) continue;
                            if (offset > lastCommitted) {
                                offsetsToCommit.put(tp, offset);
                                LOG.debug("Committing offset {} for partition {}", (Object)offset, (Object)partition);
                                continue;
                            }
                            LOG.debug("Ignoring offset {} for partition {} because it is already committed", (Object)offset, (Object)partition);
                        }
                        this.consumer.offsetHandler.commit(offsetsToCommit);
                    }
                    catch (InterruptedException e) {
                        if (!this.running) return;
                        throw e;
                        return;
                    }
                }
            }
            catch (Throwable t) {
                LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
                this.consumer.fetcher.stopWithError(t);
            }
        }

        public void close() {
            this.running = false;
            this.interrupt();
        }
    }

    public static enum FetcherType {
        LEGACY_LOW_LEVEL,
        NEW_HIGH_LEVEL;

    }

    public static enum OffsetStore {
        FLINK_ZOOKEEPER,
        KAFKA;

    }
}

