package org.apache.druid.firehose.kafka;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.net.HostAndPort;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.logger.Logger;

@Deprecated
/* loaded from: input_file:org/apache/druid/firehose/kafka/KafkaSimpleConsumer.class */
public class KafkaSimpleConsumer {
    private final List<HostAndPort> allBrokers;
    private final String topic;
    private final int partitionId;
    private final String clientId;
    private final String leaderLookupClientId;
    private final boolean earliest;
    private volatile Broker leaderBroker;
    private List<HostAndPort> replicaBrokers;
    private SimpleConsumer consumer = null;
    private static final int BUFFER_SIZE = 65536;
    private static final int FETCH_SIZE = 100000000;
    public static final List<BytesMessageWithOffset> EMPTY_MSGS = new ArrayList();
    private static final Logger log = new Logger(KafkaSimpleConsumer.class);
    private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
    private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);

    /* loaded from: input_file:org/apache/druid/firehose/kafka/KafkaSimpleConsumer$BytesMessageWithOffset.class */
    public static class BytesMessageWithOffset {
        final byte[] msg;
        final long offset;
        final int partition;

        public BytesMessageWithOffset(byte[] bArr, long j, int i) {
            this.msg = bArr;
            this.offset = j;
            this.partition = i;
        }

        public int getPartition() {
            return this.partition;
        }

        public byte[] message() {
            return this.msg;
        }

        public long offset() {
            return this.offset;
        }
    }

    public KafkaSimpleConsumer(String str, int i, String str2, List<String> list, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (String str3 : list) {
            HostAndPort fromString = HostAndPort.fromString(str3);
            Preconditions.checkArgument((fromString.getHostText() == null || fromString.getHostText().isEmpty() || !fromString.hasPort()) ? false : true, "kafka broker [%s] is not valid, must be <host>:<port>", new Object[]{str3});
            arrayList.add(fromString);
        }
        this.allBrokers = Collections.unmodifiableList(arrayList);
        this.topic = str;
        this.partitionId = i;
        this.clientId = StringUtils.format("%s_%d_%s", new Object[]{str, Integer.valueOf(i), str2});
        this.leaderLookupClientId = str2 + "leaderLookup";
        this.replicaBrokers = new ArrayList();
        this.replicaBrokers.addAll(this.allBrokers);
        this.earliest = z;
        log.info("KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", new Object[]{this.clientId, this.leaderLookupClientId});
    }

    private void ensureConsumer(Broker broker) throws InterruptedException {
        if (this.consumer == null) {
            while (this.leaderBroker == null) {
                this.leaderBroker = findNewLeader(broker);
            }
            log.info("making SimpleConsumer[%s][%s], leader broker[%s:%s]", new Object[]{this.topic, Integer.valueOf(this.partitionId), this.leaderBroker.host(), Integer.valueOf(this.leaderBroker.port())});
            this.consumer = new SimpleConsumer(this.leaderBroker.host(), this.leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, this.clientId);
        }
    }

    private Iterable<BytesMessageWithOffset> filterAndDecode(Iterable<MessageAndOffset> iterable, final long j) {
        return FunctionalIterable.create(iterable).filter(new Predicate<MessageAndOffset>() { // from class: org.apache.druid.firehose.kafka.KafkaSimpleConsumer.2
            public boolean apply(MessageAndOffset messageAndOffset) {
                return messageAndOffset.offset() >= j;
            }
        }).transform(new Function<MessageAndOffset, BytesMessageWithOffset>() { // from class: org.apache.druid.firehose.kafka.KafkaSimpleConsumer.1
            public BytesMessageWithOffset apply(MessageAndOffset messageAndOffset) {
                ByteBuffer payload = messageAndOffset.message().payload();
                byte[] bArr = new byte[payload.remaining()];
                payload.get(bArr);
                return new BytesMessageWithOffset(bArr, messageAndOffset.nextOffset(), KafkaSimpleConsumer.this.partitionId);
            }
        });
    }

    private long getOffset(boolean z) throws InterruptedException {
        TopicAndPartition topicAndPartition = new TopicAndPartition(this.topic, this.partitionId);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(z ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime(), 1));
        try {
            OffsetResponse offsetsBefore = this.consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), this.clientId));
            if (offsetsBefore.hasError()) {
                log.error("error fetching data Offset from the Broker [%s]. reason: [%s]", new Object[]{this.leaderBroker.host(), Short.valueOf(offsetsBefore.errorCode(this.topic, this.partitionId))});
                return -1L;
            }
            long[] offsets = offsetsBefore.offsets(this.topic, this.partitionId);
            return z ? offsets[0] : offsets[offsets.length - 1];
        } catch (Exception e) {
            ensureNotInterrupted(e);
            log.error(e, "caught exception in getOffsetsBefore [%s] - [%s]", new Object[]{this.topic, Integer.valueOf(this.partitionId)});
            return -1L;
        }
    }

    public Iterable<BytesMessageWithOffset> fetch(long j, int i) throws InterruptedException {
        FetchResponse fetchResponse;
        Broker broker = this.leaderBroker;
        while (true) {
            ensureConsumer(broker);
            FetchRequest build = new FetchRequestBuilder().clientId(this.clientId).addFetch(this.topic, this.partitionId, j, FETCH_SIZE).maxWait(i).minBytes(1).build();
            log.debug("fetch offset %s", new Object[]{Long.valueOf(j)});
            try {
                fetchResponse = this.consumer.fetch(build);
            } catch (Exception e) {
                ensureNotInterrupted(e);
                log.warn(e, "caught exception in fetch %s - %d", new Object[]{this.topic, Integer.valueOf(this.partitionId)});
                fetchResponse = null;
            }
            if (fetchResponse != null && !fetchResponse.hasError()) {
                break;
            }
            short errorCode = fetchResponse != null ? fetchResponse.errorCode(this.topic, this.partitionId) : ErrorMapping.UnknownCode();
            log.warn("fetch %s - %s with offset %s encounters error: [%s]", new Object[]{this.topic, Integer.valueOf(this.partitionId), Long.valueOf(j), Short.valueOf(errorCode)});
            boolean z = false;
            if (errorCode == ErrorMapping.RequestTimedOutCode()) {
                log.info("kafka request timed out, response[%s]", new Object[]{fetchResponse});
            } else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                long offset = getOffset(this.earliest);
                Logger logger = log;
                Object[] objArr = new Object[4];
                objArr[0] = this.earliest ? "earliest" : "latest";
                objArr[1] = Long.valueOf(offset);
                objArr[2] = this.topic;
                objArr[3] = Integer.valueOf(this.partitionId);
                logger.info("got [%s] offset[%s] for [%s][%s]", objArr);
                if (offset < 0) {
                    z = true;
                } else {
                    j = offset;
                }
            } else {
                z = true;
            }
            if (z) {
                stopConsumer();
                broker = this.leaderBroker;
                this.leaderBroker = null;
            }
        }
        return fetchResponse != null ? filterAndDecode(fetchResponse.messageSet(this.topic, this.partitionId), j) : EMPTY_MSGS;
    }

    private void stopConsumer() {
        try {
        } catch (Exception e) {
            log.warn(e, "stop consumer[%s][%s] failed", new Object[]{this.topic, Integer.valueOf(this.partitionId)});
        } finally {
            this.consumer = null;
        }
        if (this.consumer != null) {
            this.consumer.close();
            log.info("stop consumer[%s][%s], leaderBroker[%s]", new Object[]{this.topic, Integer.valueOf(this.partitionId), this.leaderBroker});
        }
    }

    public void stop() {
        stopConsumer();
        log.info("KafkaSimpleConsumer[%s][%s] stopped", new Object[]{this.topic, Integer.valueOf(this.partitionId)});
    }

    private PartitionMetadata findLeader() throws InterruptedException {
        for (HostAndPort hostAndPort : this.replicaBrokers) {
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    log.info("Finding new leader from Kafka brokers, try broker [%s]", new Object[]{hostAndPort.toString()});
                    simpleConsumer = new SimpleConsumer(hostAndPort.getHostText(), hostAndPort.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, this.leaderLookupClientId);
                    for (TopicMetadata topicMetadata : simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(this.topic))).topicsMetadata()) {
                        if (this.topic.equals(topicMetadata.topic())) {
                            for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
                                if (partitionMetadata.partitionId() == this.partitionId) {
                                    if (simpleConsumer != null) {
                                        simpleConsumer.close();
                                    }
                                    return partitionMetadata;
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    ensureNotInterrupted(e);
                    log.warn(e, "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", new Object[]{hostAndPort, this.topic, Integer.valueOf(this.partitionId)});
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        return null;
    }

    private Broker findNewLeader(Broker broker) throws InterruptedException {
        Broker leader;
        long j = 0;
        while (true) {
            PartitionMetadata findLeader = findLeader();
            if (findLeader != null) {
                this.replicaBrokers.clear();
                for (Broker broker2 : findLeader.replicas()) {
                    this.replicaBrokers.add(HostAndPort.fromParts(broker2.host(), broker2.port()));
                }
                log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", new Object[]{findLeader, broker});
                leader = findLeader.leader();
                if (leader != null && (broker == null || isValidNewLeader(leader) || j != 0)) {
                    break;
                }
            }
            Thread.sleep(RETRY_INTERVAL_MILLIS);
            j++;
            if (j >= 3 && (j - 3) % 5 == 0) {
                log.warn("cannot find leader for [%s] - [%s] after [%s] retries", new Object[]{this.topic, Integer.valueOf(this.partitionId), Long.valueOf(j)});
                this.replicaBrokers.clear();
                this.replicaBrokers.addAll(this.allBrokers);
            }
        }
        return leader;
    }

    private boolean isValidNewLeader(Broker broker) {
        return (this.leaderBroker.host().equalsIgnoreCase(broker.host()) && this.leaderBroker.port() == broker.port()) ? false : true;
    }

    private void ensureNotInterrupted(Exception exc) throws InterruptedException {
        if (Thread.interrupted()) {
            log.error(exc, "Interrupted during fetching for %s - %s", new Object[]{this.topic, Integer.valueOf(this.partitionId)});
            throw new InterruptedException();
        }
    }
}
