package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.impl.StreamConsumerBuilder;
import com.rabbitmq.stream.impl.Utils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamConsumer.class */
class SuperStreamConsumer implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamConsumer.class);
    private final String superStream;
    private final Map<String, Consumer> consumers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamConsumer$ConsumerState.class */
    public static final class ConsumerState {
        private volatile long offset = 0;
        private volatile StreamConsumer consumer;

        private ConsumerState() {
        }
    }

    /* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamConsumer$ManualOffsetTrackingMessageHandler.class */
    private static final class ManualOffsetTrackingMessageHandler implements MessageHandler {
        private final MessageHandler delegate;
        private final ConsumerState[] consumerStates;
        private final ConsumerState consumerState;

        private ManualOffsetTrackingMessageHandler(MessageHandler messageHandler, ConsumerState[] consumerStateArr, ConsumerState consumerState) {
            this.delegate = messageHandler;
            this.consumerStates = consumerStateArr;
            this.consumerState = consumerState;
        }

        @Override // com.rabbitmq.stream.MessageHandler
        public void handle(final MessageHandler.Context context, Message message) {
            this.delegate.handle(new MessageHandler.Context() { // from class: com.rabbitmq.stream.impl.SuperStreamConsumer.ManualOffsetTrackingMessageHandler.1
                @Override // com.rabbitmq.stream.MessageHandler.Context
                public long offset() {
                    return context.offset();
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public long timestamp() {
                    return context.timestamp();
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public long committedChunkId() {
                    return context.committedChunkId();
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public void storeOffset() {
                    for (ConsumerState consumerState : ManualOffsetTrackingMessageHandler.this.consumerStates) {
                        if (ManualOffsetTrackingMessageHandler.this.consumerState == consumerState) {
                            MessageHandler.Context context2 = context;
                            maybeStoreOffset(consumerState, () -> {
                                context2.storeOffset();
                            });
                        } else if (consumerState.offset != 0) {
                            maybeStoreOffset(consumerState, () -> {
                                consumerState.consumer.store(consumerState.offset);
                            });
                        }
                    }
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public void processed() {
                    context.processed();
                }

                private void maybeStoreOffset(ConsumerState consumerState, Runnable runnable) {
                    if (!consumerState.consumer.isSac() || consumerState.consumer.sacActive()) {
                        runnable.run();
                    }
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public String stream() {
                    return context.stream();
                }

                @Override // com.rabbitmq.stream.MessageHandler.Context
                public Consumer consumer() {
                    return context.consumer();
                }
            }, message);
            this.consumerState.offset = context.offset();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SuperStreamConsumer(StreamConsumerBuilder streamConsumerBuilder, String str, StreamEnvironment streamEnvironment, StreamConsumerBuilder.TrackingConfiguration trackingConfiguration) {
        this.superStream = str;
        List<String> list = (List) streamEnvironment.locatorOperation(Utils.namedFunction(client -> {
            return client.partitions(str);
        }, "Partition lookup for super stream '%s'", str));
        ConsumerState[] consumerStateArr = new ConsumerState[list.size()];
        HashMap hashMap = new HashMap(list.size());
        for (int i = 0; i < list.size(); i++) {
            ConsumerState consumerState = new ConsumerState();
            consumerStateArr[i] = consumerState;
            hashMap.put((String) list.get(i), consumerState);
        }
        for (String str2 : list) {
            ConsumerState consumerState2 = (ConsumerState) hashMap.get(str2);
            MessageHandler manualOffsetTrackingMessageHandler = (trackingConfiguration.enabled() && trackingConfiguration.manual()) ? new ManualOffsetTrackingMessageHandler(streamConsumerBuilder.messageHandler(), consumerStateArr, consumerState2) : streamConsumerBuilder.messageHandler();
            StreamConsumerBuilder duplicate = streamConsumerBuilder.duplicate();
            if (duplicate.consumerUpdateListener() instanceof Utils.CompositeConsumerUpdateListener) {
                duplicate.consumerUpdateListener(((Utils.CompositeConsumerUpdateListener) duplicate.consumerUpdateListener()).duplicate());
            }
            if (trackingConfiguration.enabled() && trackingConfiguration.auto()) {
                duplicate = (StreamConsumerBuilder) duplicate.autoTrackingStrategy().messageCountBeforeStorage(trackingConfiguration.autoMessageCountBeforeStorage() / list.size()).builder();
            }
            StreamConsumer streamConsumer = (StreamConsumer) duplicate.lazyInit(true).superStream(null).messageHandler(manualOffsetTrackingMessageHandler).stream(str2).build();
            this.consumers.put(str2, streamConsumer);
            consumerState2.consumer = streamConsumer;
            LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", str2, str);
        }
        this.consumers.values().forEach(consumer -> {
            ((StreamConsumer) consumer).start();
        });
    }

    @Override // com.rabbitmq.stream.Consumer
    public void store(long j) {
        throw new UnsupportedOperationException("Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
    }

    Consumer consumer(String str) {
        return this.consumers.get(str);
    }

    @Override // com.rabbitmq.stream.Consumer
    public long storedOffset() {
        throw new UnsupportedOperationException("Consumer#storedOffset() does not work for super streams");
    }

    @Override // com.rabbitmq.stream.Consumer, java.lang.AutoCloseable
    public void close() {
        for (Map.Entry<String, Consumer> entry : this.consumers.entrySet()) {
            LOGGER.debug("Closing consumer for partition '{}' of super stream {}", entry.getKey(), this.superStream);
            try {
                entry.getValue().close();
            } catch (Exception e) {
                LOGGER.info("Error while closing consumer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
            }
        }
    }
}
