/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.ClientResponse;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.FetchSessionHandler;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.CompletedFetch;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.Cluster;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.Node;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.Uuid;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.FetchResponseData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.protocol.ApiKeys;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.protocol.Errors;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.record.RecordBatch;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.FetchRequest;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.FetchResponse;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.BufferSupplier;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Time;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Timer;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;

public abstract class AbstractFetch<K, V>
implements Closeable {
    private final Logger log;
    protected final LogContext logContext;
    protected final ConsumerNetworkClient client;
    protected final ConsumerMetadata metadata;
    protected final SubscriptionState subscriptions;
    protected final FetchConfig<K, V> fetchConfig;
    protected final Time time;
    protected final FetchMetricsManager metricsManager;
    private final BufferSupplier decompressionBufferSupplier;
    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
    private final Map<Integer, FetchSessionHandler> sessionHandlers;
    private final Set<Integer> nodesWithPendingFetchRequests;
    private CompletedFetch<K, V> nextInLineFetch;

    public AbstractFetch(LogContext logContext, ConsumerNetworkClient client, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig<K, V> fetchConfig, FetchMetricsManager metricsManager, Time time) {
        this.log = logContext.logger(AbstractFetch.class);
        this.logContext = logContext;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.fetchConfig = fetchConfig;
        this.decompressionBufferSupplier = BufferSupplier.create();
        this.completedFetches = new ConcurrentLinkedQueue();
        this.sessionHandlers = new HashMap<Integer, FetchSessionHandler>();
        this.nodesWithPendingFetchRequests = new HashSet<Integer>();
        this.metricsManager = metricsManager;
        this.time = time;
    }

    boolean hasCompletedFetches() {
        return !this.completedFetches.isEmpty();
    }

    public boolean hasAvailableFetches() {
        return this.completedFetches.stream().anyMatch(fetch -> this.subscriptions.isFetchable(fetch.partition));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleFetchResponse(Node fetchTarget, FetchSessionHandler.FetchRequestData data, ClientResponse resp) {
        try {
            FetchResponse response = (FetchResponse)resp.responseBody();
            FetchSessionHandler handler = this.sessionHandler(fetchTarget.id());
            if (handler == null) {
                this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", (Object)fetchTarget.id());
                return;
            }
            short requestVersion = resp.requestHeader().apiVersion();
            if (!handler.handleResponse(response, requestVersion)) {
                if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                    this.metadata.requestUpdate();
                }
                return;
            }
            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion);
            HashSet<TopicPartition> partitions = new HashSet<TopicPartition>(responseData.keySet());
            FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(this.metricsManager, partitions);
            for (Map.Entry entry : responseData.entrySet()) {
                TopicPartition partition = (TopicPartition)entry.getKey();
                FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
                if (requestData == null) {
                    String message = data.metadata().isFull() ? MessageFormatter.arrayFormat((String)"Response for missing full request partition: partition={}; metadata={}", (Object[])new Object[]{partition, data.metadata()}).getMessage() : MessageFormatter.arrayFormat((String)"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", (Object[])new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage();
                    throw new IllegalStateException(message);
                }
                long fetchOffset = requestData.fetchOffset;
                FetchResponseData.PartitionData partitionData = (FetchResponseData.PartitionData)entry.getValue();
                this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", new Object[]{this.fetchConfig.isolationLevel, fetchOffset, partition, partitionData});
                CompletedFetch<K, V> completedFetch = new CompletedFetch<K, V>(this.logContext, this.subscriptions, this.fetchConfig, this.decompressionBufferSupplier, partition, partitionData, metricAggregator, fetchOffset, requestVersion);
                this.completedFetches.add(completedFetch);
            }
            this.metricsManager.recordLatency(resp.requestLatencyMs());
        }
        finally {
            this.log.debug("Removing pending request for node {}", (Object)fetchTarget);
            this.nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleFetchResponse(Node fetchTarget, RuntimeException e) {
        try {
            FetchSessionHandler handler = this.sessionHandler(fetchTarget.id());
            if (handler != null) {
                handler.handleError(e);
                handler.sessionTopicPartitions().forEach(this.subscriptions::clearPreferredReadReplica);
            }
        }
        finally {
            this.log.debug("Removing pending request for node {}", (Object)fetchTarget);
            this.nodesWithPendingFetchRequests.remove(fetchTarget.id());
        }
    }

    protected FetchRequest.Builder createFetchRequest(Node fetchTarget, FetchSessionHandler.FetchRequestData requestData) {
        short maxVersion = requestData.canUseTopicIds() ? (short)ApiKeys.FETCH.latestVersion() : (short)12;
        FetchRequest.Builder request = FetchRequest.Builder.forConsumer(maxVersion, this.fetchConfig.maxWaitMs, this.fetchConfig.minBytes, requestData.toSend()).isolationLevel(this.fetchConfig.isolationLevel).setMaxBytes(this.fetchConfig.maxBytes).metadata(requestData.metadata()).removed(requestData.toForget()).replaced(requestData.toReplace()).rackId(this.fetchConfig.clientRackId);
        this.log.debug("Sending {} {} to broker {}", new Object[]{this.fetchConfig.isolationLevel, requestData, fetchTarget});
        this.log.debug("Adding pending request for node {}", (Object)fetchTarget);
        this.nodesWithPendingFetchRequests.add(fetchTarget.id());
        return request;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Fetch<K, V> collectFetch() {
        Fetch<K, V> fetch = Fetch.empty();
        ArrayDeque<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<CompletedFetch<K, V>>();
        int recordsRemaining = this.fetchConfig.maxPollRecords;
        try {
            while (recordsRemaining > 0) {
                if (this.nextInLineFetch == null || this.nextInLineFetch.isConsumed) {
                    CompletedFetch<K, V> records = this.completedFetches.peek();
                    if (records == null) {
                        break;
                    }
                    if (!records.initialized) {
                        try {
                            this.nextInLineFetch = this.initializeCompletedFetch(records);
                        }
                        catch (Exception e) {
                            if (fetch.isEmpty() && FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) {
                                this.completedFetches.poll();
                            }
                            throw e;
                        }
                    } else {
                        this.nextInLineFetch = records;
                    }
                    this.completedFetches.poll();
                    continue;
                }
                if (this.subscriptions.isPaused(this.nextInLineFetch.partition)) {
                    this.log.debug("Skipping fetching records for assigned partition {} because it is paused", (Object)this.nextInLineFetch.partition);
                    pausedCompletedFetches.add(this.nextInLineFetch);
                    this.nextInLineFetch = null;
                    continue;
                }
                Fetch<K, V> nextFetch = this.fetchRecords(recordsRemaining);
                recordsRemaining -= nextFetch.numRecords();
                fetch.add(nextFetch);
            }
        }
        catch (KafkaException e) {
            if (fetch.isEmpty()) {
                throw e;
            }
        }
        finally {
            this.completedFetches.addAll(pausedCompletedFetches);
        }
        return fetch;
    }

    private Fetch<K, V> fetchRecords(int maxRecords) {
        if (!this.subscriptions.isAssigned(this.nextInLineFetch.partition)) {
            this.log.debug("Not returning fetched records for partition {} since it is no longer assigned", (Object)this.nextInLineFetch.partition);
        } else if (!this.subscriptions.isFetchable(this.nextInLineFetch.partition)) {
            this.log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", (Object)this.nextInLineFetch.partition);
        } else {
            SubscriptionState.FetchPosition position = this.subscriptions.position(this.nextInLineFetch.partition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + this.nextInLineFetch.partition);
            }
            if (this.nextInLineFetch.nextFetchOffset == position.offset) {
                Long lead;
                Long partitionLag;
                List<ConsumerRecord<K, V>> partRecords = this.nextInLineFetch.fetchRecords(maxRecords);
                this.log.trace("Returning {} fetched records at offset {} for assigned partition {}", new Object[]{partRecords.size(), position, this.nextInLineFetch.partition});
                boolean positionAdvanced = false;
                if (this.nextInLineFetch.nextFetchOffset > position.offset) {
                    SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(this.nextInLineFetch.nextFetchOffset, this.nextInLineFetch.lastEpoch, position.currentLeader);
                    this.log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", new Object[]{position, nextPosition, this.nextInLineFetch.partition, partRecords.size()});
                    this.subscriptions.position(this.nextInLineFetch.partition, nextPosition);
                    positionAdvanced = true;
                }
                if ((partitionLag = this.subscriptions.partitionLag(this.nextInLineFetch.partition, this.fetchConfig.isolationLevel)) != null) {
                    this.metricsManager.recordPartitionLag(this.nextInLineFetch.partition, partitionLag);
                }
                if ((lead = this.subscriptions.partitionLead(this.nextInLineFetch.partition)) != null) {
                    this.metricsManager.recordPartitionLead(this.nextInLineFetch.partition, lead);
                }
                return Fetch.forPartition(this.nextInLineFetch.partition, partRecords, positionAdvanced);
            }
            this.log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", new Object[]{this.nextInLineFetch.partition, this.nextInLineFetch.nextFetchOffset, position});
        }
        this.log.trace("Draining fetched records for partition {}", (Object)this.nextInLineFetch.partition);
        this.nextInLineFetch.drain();
        return Fetch.empty();
    }

    private List<TopicPartition> fetchablePartitions() {
        HashSet<TopicPartition> exclude = new HashSet<TopicPartition>();
        if (this.nextInLineFetch != null && !this.nextInLineFetch.isConsumed) {
            exclude.add(this.nextInLineFetch.partition);
        }
        for (CompletedFetch<K, V> completedFetch : this.completedFetches) {
            exclude.add(completedFetch.partition);
        }
        return this.subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
    }

    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long currentTimeMs) {
        Optional<Integer> nodeId = this.subscriptions.preferredReadReplica(partition, currentTimeMs);
        if (nodeId.isPresent()) {
            Optional node = nodeId.flatMap(id -> this.metadata.fetch().nodeIfOnline(partition, (int)id));
            if (node.isPresent()) {
                return (Node)node.get();
            }
            this.log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata, using the leader instead.", nodeId, (Object)partition);
            this.requestMetadataUpdate(partition);
            return leaderReplica;
        }
        return leaderReplica;
    }

    protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        this.metricsManager.maybeUpdateAssignment(this.subscriptions);
        LinkedHashMap<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<Node, FetchSessionHandler.Builder>();
        long currentTimeMs = this.time.milliseconds();
        Map<String, Uuid> topicIds = this.metadata.topicIds();
        for (TopicPartition partition : this.fetchablePartitions()) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + partition);
            }
            Optional<Node> leaderOpt = position.currentLeader.leader;
            if (!leaderOpt.isPresent()) {
                this.log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", (Object)partition, (Object)position);
                this.metadata.requestUpdate();
                continue;
            }
            Node node = this.selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
            if (this.client.isUnavailable(node)) {
                this.client.maybeThrowAuthFailure(node);
                this.log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", (Object)partition, (Object)node);
                continue;
            }
            if (this.nodesWithPendingFetchRequests.contains(node.id())) {
                this.log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", (Object)partition, (Object)node);
                continue;
            }
            FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                FetchSessionHandler fetchSessionHandler = this.sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(this.logContext, (int)n));
                return fetchSessionHandler.newBuilder();
            });
            Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
            FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId, position.offset, -1L, this.fetchConfig.fetchSize, position.currentLeader.epoch, Optional.empty());
            builder.add(partition, partitionData);
            this.log.debug("Added {} fetch request for partition {} at position {} to node {}", new Object[]{this.fetchConfig.isolationLevel, partition, position, node});
        }
        LinkedHashMap<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<Node, FetchSessionHandler.FetchRequestData>();
        for (Map.Entry entry : fetchable.entrySet()) {
            reqs.put((Node)entry.getKey(), ((FetchSessionHandler.Builder)entry.getValue()).build());
        }
        return reqs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> completedFetch) {
        TopicPartition tp = completedFetch.partition;
        Errors error = Errors.forCode(completedFetch.partitionData.errorCode());
        boolean recordMetrics = true;
        try {
            if (!this.subscriptions.hasValidPosition(tp)) {
                this.log.debug("Ignoring fetched records for partition {} since it no longer has valid position", (Object)tp);
                CompletedFetch<K, V> completedFetch2 = null;
                return completedFetch2;
            }
            if (error == Errors.NONE) {
                CompletedFetch<K, V> ret = this.handleInitializeCompletedFetchSuccess(completedFetch);
                recordMetrics = ret == null;
                CompletedFetch<K, V> completedFetch3 = ret;
                return completedFetch3;
            }
            this.handleInitializeCompletedFetchErrors(completedFetch, error);
            CompletedFetch<K, V> completedFetch4 = null;
            return completedFetch4;
        }
        finally {
            if (recordMetrics) {
                completedFetch.recordAggregatedMetrics(0, 0);
            }
            if (error != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(tp);
            }
        }
    }

    private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(CompletedFetch<K, V> completedFetch) {
        TopicPartition tp = completedFetch.partition;
        long fetchOffset = completedFetch.nextFetchOffset;
        SubscriptionState.FetchPosition position = this.subscriptions.position(tp);
        if (position == null || position.offset != fetchOffset) {
            this.log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", new Object[]{tp, fetchOffset, position});
            return null;
        }
        FetchResponseData.PartitionData partition = completedFetch.partitionData;
        this.log.trace("Preparing to read {} bytes of data for partition {} with offset {}", new Object[]{FetchResponse.recordsSize(partition), tp, position});
        Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator();
        if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) {
            if (completedFetch.requestVersion < 3) {
                Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
                throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchConfig.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + "max.partition.fetch.bytes" + ")", recordTooLargePartitions);
            }
            throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + fetchOffset + ". Received a non-empty fetch response from the server, but no complete records were found.");
        }
        if (partition.highWatermark() >= 0L) {
            this.log.trace("Updating high watermark for partition {} to {}", (Object)tp, (Object)partition.highWatermark());
            this.subscriptions.updateHighWatermark(tp, partition.highWatermark());
        }
        if (partition.logStartOffset() >= 0L) {
            this.log.trace("Updating log start offset for partition {} to {}", (Object)tp, (Object)partition.logStartOffset());
            this.subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
        }
        if (partition.lastStableOffset() >= 0L) {
            this.log.trace("Updating last stable offset for partition {} to {}", (Object)tp, (Object)partition.lastStableOffset());
            this.subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
        }
        if (FetchResponse.isPreferredReplica(partition)) {
            this.subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> {
                long expireTimeMs = this.time.milliseconds() + this.metadata.metadataExpireMs();
                this.log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", new Object[]{tp, partition.preferredReadReplica(), expireTimeMs});
                return expireTimeMs;
            });
        }
        completedFetch.initialized = true;
        return completedFetch;
    }

    private void handleInitializeCompletedFetchErrors(CompletedFetch<K, V> completedFetch, Errors error) {
        TopicPartition tp = completedFetch.partition;
        long fetchOffset = completedFetch.nextFetchOffset;
        if (error == Errors.NOT_LEADER_OR_FOLLOWER || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) {
            this.log.debug("Error in fetch for partition {}: {}", (Object)tp, (Object)error.exceptionName());
            this.requestMetadataUpdate(tp);
        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            this.log.warn("Received unknown topic or partition error in fetch for partition {}", (Object)tp);
            this.requestMetadataUpdate(tp);
        } else if (error == Errors.UNKNOWN_TOPIC_ID) {
            this.log.warn("Received unknown topic ID error in fetch for partition {}", (Object)tp);
            this.requestMetadataUpdate(tp);
        } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
            this.log.warn("Received inconsistent topic ID error in fetch for partition {}", (Object)tp);
            this.requestMetadataUpdate(tp);
        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
            Optional<Integer> clearedReplicaId = this.subscriptions.clearPreferredReadReplica(tp);
            if (!clearedReplicaId.isPresent()) {
                SubscriptionState.FetchPosition position = this.subscriptions.position(tp);
                if (position == null || fetchOffset != position.offset) {
                    this.log.debug("Discarding stale fetch response for partition {} since the fetched offset {} does not match the current offset {}", new Object[]{tp, fetchOffset, position});
                } else {
                    this.handleOffsetOutOfRange(position, tp);
                }
            } else {
                this.log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", new Object[]{clearedReplicaId.get(), tp, error, fetchOffset});
            }
        } else {
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                this.log.warn("Not authorized to read from partition {}.", (Object)tp);
                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
            }
            if (error == Errors.UNKNOWN_LEADER_EPOCH) {
                this.log.debug("Received unknown leader epoch error in fetch for partition {}", (Object)tp);
            } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
                this.log.warn("Unknown server error while fetching offset {} for topic-partition {}", (Object)fetchOffset, (Object)tp);
            } else {
                if (error == Errors.CORRUPT_MESSAGE) {
                    throw new KafkaException("Encountered corrupt message when fetching offset " + fetchOffset + " for topic-partition " + tp);
                }
                throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching at offset " + fetchOffset + " from topic-partition " + tp);
            }
        }
    }

    private void handleOffsetOutOfRange(SubscriptionState.FetchPosition fetchPosition, TopicPartition topicPartition) {
        String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
        if (!this.subscriptions.hasDefaultOffsetResetPolicy()) {
            this.log.info("{}, raising error to the application since no reset policy is configured", (Object)errorMessage);
            throw new OffsetOutOfRangeException(errorMessage, Collections.singletonMap(topicPartition, fetchPosition.offset));
        }
        this.log.info("{}, resetting offset", (Object)errorMessage);
        this.subscriptions.requestOffsetReset(topicPartition);
    }

    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
        Iterator<CompletedFetch<K, V>> completedFetchesItr = this.completedFetches.iterator();
        while (completedFetchesItr.hasNext()) {
            CompletedFetch<K, V> completedFetch = completedFetchesItr.next();
            TopicPartition tp = completedFetch.partition;
            if (assignedPartitions.contains(tp)) continue;
            this.log.debug("Removing {} from buffered data as it is no longer an assigned partition", (Object)tp);
            completedFetch.drain();
            completedFetchesItr.remove();
        }
        if (this.nextInLineFetch != null && !assignedPartitions.contains(this.nextInLineFetch.partition)) {
            this.nextInLineFetch.drain();
            this.nextInLineFetch = null;
        }
    }

    public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
        HashSet<TopicPartition> currentTopicPartitions = new HashSet<TopicPartition>();
        for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
            if (!assignedTopics.contains(tp.topic())) continue;
            currentTopicPartitions.add(tp);
        }
        this.clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
    }

    protected FetchSessionHandler sessionHandler(int node) {
        return this.sessionHandlers.get(node);
    }

    void maybeCloseFetchSessions(Timer timer) {
        Cluster cluster = this.metadata.fetch();
        ArrayList requestFutures = new ArrayList();
        this.sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
            sessionHandler.notifyClose();
            final int sessionId = sessionHandler.sessionId();
            final Node fetchTarget = cluster.nodeById((int)fetchTargetNodeId);
            if (fetchTarget == null || this.client.isUnavailable(fetchTarget)) {
                this.log.debug("Skip sending close session request to broker {} since it is not reachable", (Object)fetchTarget);
                return;
            }
            FetchRequest.Builder request = this.createFetchRequest(fetchTarget, sessionHandler.newBuilder().build());
            RequestFuture<ClientResponse> responseFuture = this.client.send(fetchTarget, request);
            responseFuture.addListener(new RequestFutureListener<ClientResponse>(){

                @Override
                public void onSuccess(ClientResponse value) {
                    AbstractFetch.this.log.debug("Successfully sent a close message for fetch session: {} to node: {}", (Object)sessionId, (Object)fetchTarget);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    AbstractFetch.this.log.debug("Unable to a close message for fetch session: {} to node: {}. This may result in unnecessary fetch sessions at the broker.", new Object[]{sessionId, fetchTarget, e});
                }
            });
            requestFutures.add(responseFuture);
        });
        while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.client.poll(timer, null, true);
        }
        if (!requestFutures.stream().allMatch(RequestFuture::isDone)) {
            this.log.debug("All requests couldn't be sent in the specific timeout period {}ms. This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for KafkaConsumer.close(Duration timeout)", (Object)timer.timeoutMs());
        }
    }

    public void close(Timer timer) {
        this.client.disableWakeups();
        if (this.nextInLineFetch != null) {
            this.nextInLineFetch.drain();
            this.nextInLineFetch = null;
        }
        this.maybeCloseFetchSessions(timer);
        Utils.closeQuietly(this.decompressionBufferSupplier, "decompressionBufferSupplier");
        this.sessionHandlers.clear();
    }

    @Override
    public void close() {
        this.close(this.time.timer(0L));
    }

    private void requestMetadataUpdate(TopicPartition topicPartition) {
        this.metadata.requestUpdate();
        this.subscriptions.clearPreferredReadReplica(topicPartition);
    }
}

