/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LegacyFetcher
implements Fetcher {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
    private final String topic;
    private final Properties config;
    private final String taskName;
    private final AtomicReference<Throwable> error;
    private Map<TopicPartition, Long> partitionsToRead;
    private volatile Thread mainThread;
    private volatile boolean running = true;

    public LegacyFetcher(String topic, Properties props, String taskName) {
        this.config = Preconditions.checkNotNull(props, "The config properties cannot be null");
        this.topic = Preconditions.checkNotNull(topic, "The topic cannot be null");
        this.taskName = taskName;
        this.error = new AtomicReference();
    }

    @Override
    public void setPartitionsToRead(List<TopicPartition> partitions) {
        this.partitionsToRead = new HashMap<TopicPartition, Long>(partitions.size());
        for (TopicPartition tp : partitions) {
            this.partitionsToRead.put(tp, -915623761776L);
        }
    }

    @Override
    public void seek(TopicPartition topicPartition, long offsetToRead) {
        if (this.partitionsToRead == null) {
            throw new IllegalArgumentException("No partitions to read set");
        }
        if (!this.partitionsToRead.containsKey(topicPartition)) {
            throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition + ") we are not going to read. Partitions to read " + this.partitionsToRead);
        }
        this.partitionsToRead.put(topicPartition, offsetToRead);
    }

    @Override
    public void close() {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception {
        if (this.partitionsToRead == null || this.partitionsToRead.size() == 0) {
            throw new IllegalArgumentException("No partitions set");
        }
        this.mainThread = Thread.currentThread();
        LOG.info("Reading from partitions " + this.partitionsToRead + " using the legacy fetcher");
        PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(this.topic, this.config);
        infoFetcher.start();
        KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000L);
        watchDog.start();
        List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
        int fetchPartitionsCount = 0;
        HashMap<Node, ArrayList<FetchPartition>> fetchBrokers = new HashMap<Node, ArrayList<FetchPartition>>();
        for (PartitionInfo partitionInfo : allPartitionsInTopic) {
            if (partitionInfo.leader() == null) {
                throw new RuntimeException("Unable to consume partition " + partitionInfo.partition() + " from topic " + partitionInfo.topic() + " because it does not have a leader");
            }
            for (Map.Entry<TopicPartition, Long> entry : this.partitionsToRead.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                long offset = entry.getValue();
                if (topicPartition.partition() != partitionInfo.partition()) continue;
                ArrayList<FetchPartition> partitions = (ArrayList<FetchPartition>)fetchBrokers.get(partitionInfo.leader());
                if (partitions == null) {
                    partitions = new ArrayList<FetchPartition>();
                    fetchBrokers.put(partitionInfo.leader(), partitions);
                }
                partitions.add(new FetchPartition(topicPartition.partition(), offset));
                ++fetchPartitionsCount;
            }
        }
        if (this.partitionsToRead.size() != fetchPartitionsCount) {
            throw new RuntimeException(this.partitionsToRead.size() + " partitions to read, but got only " + fetchPartitionsCount + " partition infos with lead brokers.");
        }
        ArrayList<SimpleConsumerThread<T>> consumers = new ArrayList<SimpleConsumerThread<T>>(fetchBrokers.size());
        for (Map.Entry entry : fetchBrokers.entrySet()) {
            Node node = (Node)entry.getKey();
            List partitionsList = (List)entry.getValue();
            FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
            SimpleConsumerThread<T> thread = new SimpleConsumerThread<T>(this, this.config, this.topic, node, partitions, sourceContext, valueDeserializer, lastOffsets);
            thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.taskName, node.id(), node.host(), node.port()));
            thread.setDaemon(true);
            consumers.add(thread);
        }
        if (!this.running) {
            return;
        }
        for (SimpleConsumerThread simpleConsumerThread : consumers) {
            LOG.info("Starting thread {}", (Object)simpleConsumerThread.getName());
            simpleConsumerThread.start();
        }
        try {
            boolean bl;
            boolean bl2 = true;
            while (this.running && this.error.get() == null && bl) {
                try {
                    for (SimpleConsumerThread simpleConsumerThread : consumers) {
                        simpleConsumerThread.join();
                    }
                    bl = false;
                    for (SimpleConsumerThread simpleConsumerThread : consumers) {
                        bl |= simpleConsumerThread.isAlive();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
            Throwable throwable = this.error.get();
            if (throwable != null) {
                throw new Exception(throwable.getMessage(), throwable);
            }
        }
        finally {
            for (SimpleConsumerThread simpleConsumerThread : consumers) {
                if (!simpleConsumerThread.isAlive()) continue;
                simpleConsumerThread.cancel();
            }
        }
    }

    void onErrorInFetchThread(Throwable error) {
        if (this.error.compareAndSet(null, error) && this.mainThread != null) {
            this.mainThread.interrupt();
        }
    }

    private static class KillerWatchDog
    extends Thread {
        private final Thread toKill;
        private final long timeout;

        private KillerWatchDog(Thread toKill, long timeout) {
            super("KillerWatchDog");
            this.setDaemon(true);
            this.toKill = toKill;
            this.timeout = timeout;
        }

        @Override
        public void run() {
            long now;
            long deadline = System.currentTimeMillis() + this.timeout;
            while (this.toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
                try {
                    this.toKill.join(deadline - now);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (this.toKill.isAlive()) {
                this.toKill.stop();
            }
        }
    }

    private static class PartitionInfoFetcher
    extends Thread {
        private final String topic;
        private final Properties properties;
        private volatile List<PartitionInfo> result;
        private volatile Throwable error;

        PartitionInfoFetcher(String topic, Properties properties) {
            this.topic = topic;
            this.properties = properties;
        }

        @Override
        public void run() {
            try {
                this.result = FlinkKafkaConsumer.getPartitionsForTopic(this.topic, this.properties);
            }
            catch (Throwable t) {
                this.error = t;
            }
        }

        public List<PartitionInfo> getPartitions() throws Exception {
            try {
                this.join();
            }
            catch (InterruptedException e) {
                throw new Exception("Partition fetching was cancelled before completion");
            }
            if (this.error != null) {
                throw new Exception("Failed to fetch partitions for topic " + this.topic, this.error);
            }
            if (this.result != null) {
                return this.result;
            }
            throw new Exception("Partition fetching failed");
        }
    }

    private static class SimpleConsumerThread<T>
    extends Thread {
        private final SourceFunction.SourceContext<T> sourceContext;
        private final DeserializationSchema<T> valueDeserializer;
        private final long[] offsetsState;
        private final FetchPartition[] partitions;
        private final Node broker;
        private final String topic;
        private final Properties config;
        private final LegacyFetcher owner;
        private SimpleConsumer consumer;
        private volatile boolean running = true;

        public SimpleConsumerThread(LegacyFetcher owner, Properties config, String topic, Node broker, FetchPartition[] partitions, SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] offsetsState) {
            this.owner = owner;
            this.config = config;
            this.topic = topic;
            this.broker = broker;
            this.partitions = partitions;
            this.sourceContext = Preconditions.checkNotNull(sourceContext);
            this.valueDeserializer = Preconditions.checkNotNull(valueDeserializer);
            this.offsetsState = Preconditions.checkNotNull(offsetsState);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                String clientId = "flink-kafka-consumer-legacy-" + this.broker.id();
                int soTimeout = Integer.valueOf(this.config.getProperty("socket.timeout.ms", "30000"));
                int bufferSize = Integer.valueOf(this.config.getProperty("socket.receive.buffer.bytes", "65536"));
                int fetchSize = Integer.valueOf(this.config.getProperty("fetch.message.max.bytes", "1048576"));
                int maxWait = Integer.valueOf(this.config.getProperty("fetch.wait.max.ms", "100"));
                int minBytes = Integer.valueOf(this.config.getProperty("fetch.min.bytes", "1"));
                this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), soTimeout, bufferSize, clientId);
                ArrayList<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<FetchPartition>();
                for (FetchPartition fp : this.partitions) {
                    if (fp.nextOffsetToRead != -915623761776L) continue;
                    partitionsToGetOffsetsFor.add(fp);
                }
                if (partitionsToGetOffsetsFor.size() > 0) {
                    long timeType = this.config.getProperty("auto.offset.reset", "latest").equals("latest") ? kafka.api.OffsetRequest.LatestTime() : kafka.api.OffsetRequest.EarliestTime();
                    SimpleConsumerThread.getLastOffset(this.consumer, this.topic, partitionsToGetOffsetsFor, timeType);
                    LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}", (Object)this.topic, partitionsToGetOffsetsFor);
                }
                while (this.running) {
                    FetchRequestBuilder frb = new FetchRequestBuilder();
                    frb.clientId(clientId);
                    frb.maxWait(maxWait);
                    frb.minBytes(minBytes);
                    for (FetchPartition fp : this.partitions) {
                        frb.addFetch(this.topic, fp.partition, fp.nextOffsetToRead, fetchSize);
                    }
                    FetchRequest fetchRequest = frb.build();
                    LOG.debug("Issuing fetch request {}", (Object)fetchRequest);
                    FetchResponse fetchResponse = this.consumer.fetch(fetchRequest);
                    if (fetchResponse.hasError()) {
                        String exception = "";
                        for (FetchPartition fp : this.partitions) {
                            short code = fetchResponse.errorCode(this.topic, fp.partition);
                            if (code == ErrorMapping.NoError()) continue;
                            exception = exception + "\nException for partition " + fp.partition + ": " + StringUtils.stringifyException((Throwable)ErrorMapping.exceptionFor((short)code));
                        }
                        throw new IOException("Error while fetching from broker: " + exception);
                    }
                    int messagesInFetch = 0;
                    for (FetchPartition fp : this.partitions) {
                        ByteBufferMessageSet messageSet = fetchResponse.messageSet(this.topic, fp.partition);
                        int partition = fp.partition;
                        for (MessageAndOffset msg : messageSet) {
                            long offset;
                            if (this.running) {
                                ++messagesInFetch;
                                if (msg.offset() < fp.nextOffsetToRead) {
                                    LOG.info("Skipping message with offset " + msg.offset() + " because we have seen messages until " + fp.nextOffsetToRead + " from partition " + fp.partition + " already");
                                    continue;
                                }
                                ByteBuffer payload = msg.message().payload();
                                byte[] valueByte = new byte[payload.remaining()];
                                payload.get(valueByte);
                                Object value = this.valueDeserializer.deserialize(valueByte);
                                offset = msg.offset();
                                Object object = this.sourceContext.getCheckpointLock();
                                synchronized (object) {
                                    this.sourceContext.collect(value);
                                    this.offsetsState[partition] = offset;
                                }
                            } else {
                                return;
                            }
                            fp.nextOffsetToRead = offset + 1L;
                        }
                    }
                    LOG.debug("This fetch contained {} messages", (Object)messagesInFetch);
                }
            }
            catch (Throwable t) {
                this.owner.onErrorInFetchThread(t);
            }
            finally {
                if (this.consumer != null) {
                    try {
                        this.consumer.close();
                    }
                    catch (Throwable t) {
                        LOG.error("Error while closing the Kafka simple consumer", t);
                    }
                }
            }
        }

        public void cancel() {
            this.running = false;
            if (this.consumer != null) {
                this.consumer.close();
            }
            this.interrupt();
        }

        private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
            HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
            for (FetchPartition fp : partitions) {
                TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
                requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
            }
            OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
            OffsetResponse response = consumer.getOffsetsBefore(request);
            if (response.hasError()) {
                String exception = "";
                for (FetchPartition fp : partitions) {
                    short code = response.errorCode(topic, fp.partition);
                    if (code == ErrorMapping.NoError()) continue;
                    exception = exception + "\nException for partition " + fp.partition + ": " + StringUtils.stringifyException((Throwable)ErrorMapping.exceptionFor((short)code));
                }
                throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions + ". " + exception);
            }
            for (FetchPartition fp : partitions) {
                fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
            }
        }
    }

    private static class FetchPartition {
        int partition;
        long nextOffsetToRead;

        FetchPartition(int partition, long nextOffsetToRead) {
            this.partition = partition;
            this.nextOffsetToRead = nextOffsetToRead;
        }

        public String toString() {
            return "FetchPartition {partition=" + this.partition + ", offset=" + this.nextOffsetToRead + '}';
        }
    }
}

