/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.smallrye.reactive.messaging.kafka.impl.RuntimeKafkaSourceConfiguration;
import io.vertx.core.Context;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

public class KafkaRecordStream<K, V>
extends AbstractMulti<ConsumerRecord<K, V>> {
    private final ReactiveKafkaConsumer<K, V> client;
    private final RuntimeKafkaSourceConfiguration config;
    private final Context context;
    private final Set<KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>>> subscriptions = Collections.newSetFromMap(new ConcurrentHashMap());

    public KafkaRecordStream(ReactiveKafkaConsumer<K, V> client, RuntimeKafkaSourceConfiguration config, Context context) {
        this.config = config;
        this.client = client;
        this.context = context;
    }

    public void subscribe(MultiSubscriber<? super ConsumerRecord<K, V>> subscriber) {
        int maxPollRecords = this.config.getMaxPollRecords();
        KafkaRecordStreamSubscription<K, V, ? super ConsumerRecord<K, V>> subscription = new KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>>(this.client, this.config, subscriber, this.context, maxPollRecords, (cr, q) -> q.addAll(cr));
        this.subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    void removeFromQueueRecordsFromTopicPartitions(Collection<TopicPartition> revokedPartitions) {
        if (revokedPartitions.isEmpty()) {
            return;
        }
        HashMap revoked = new HashMap();
        revokedPartitions.forEach(topicPartition -> revoked.computeIfAbsent(topicPartition.topic(), t -> new HashSet()).add(topicPartition.partition()));
        this.subscriptions.forEach(s -> this.removeFromQueue((KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>>)s, revoked));
    }

    private void removeFromQueue(KafkaRecordStreamSubscription<K, V, ConsumerRecord<K, V>> subscription, Map<String, Set<Integer>> revoked) {
        subscription.rewriteQueue(cr -> {
            Set revokedPartitions = (Set)revoked.get(cr.topic());
            if (revokedPartitions != null && revokedPartitions.contains(cr.partition())) {
                return null;
            }
            return cr;
        });
    }
}

