package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.RoutingStrategy;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamProducer.class */
class SuperStreamProducer implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SuperStreamProducer.class);
    private final RoutingStrategy routingStrategy;
    private final Codec codec;
    private final String superStream;
    private final StreamProducerBuilder producerBuilder;
    private final StreamEnvironment environment;
    private final String name;
    private final RoutingStrategy.Metadata superStreamMetadata;
    private final MessageInterceptor messageInterceptor;
    private final Map<String, Producer> producers = new ConcurrentHashMap();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamProducer$DefaultSuperStreamMetadata.class */
    private static final class DefaultSuperStreamMetadata implements RoutingStrategy.Metadata {
        private final String superStream;
        private final StreamEnvironment environment;
        private final List<String> partitions;
        private final Map<String, List<String>> routes;

        private DefaultSuperStreamMetadata(String str, StreamEnvironment streamEnvironment) {
            this.routes = new ConcurrentHashMap();
            this.superStream = str;
            this.environment = streamEnvironment;
            List list = (List) streamEnvironment.locatorOperation(Utils.namedFunction(client -> {
                return client.partitions(str);
            }, "Partition lookup for super stream '%s'", str));
            if (list.isEmpty()) {
                throw new IllegalArgumentException("Super stream '" + str + "' has no partition");
            }
            this.partitions = new CopyOnWriteArrayList(list);
        }

        @Override // com.rabbitmq.stream.RoutingStrategy.Metadata
        public List<String> partitions() {
            return this.partitions;
        }

        @Override // com.rabbitmq.stream.RoutingStrategy.Metadata
        public List<String> route(String str) {
            return this.routes.computeIfAbsent(str, str2 -> {
                return (List) this.environment.locatorOperation(Utils.namedFunction(client -> {
                    return client.route(str2, this.superStream);
                }, "Route lookup on super stream '%s' for key '%s'", this.superStream, str2));
            });
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:com/rabbitmq/stream/impl/SuperStreamProducer$MessageInterceptor.class */
    private interface MessageInterceptor {
        Message apply(int i, Message message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SuperStreamProducer(StreamProducerBuilder streamProducerBuilder, String str, String str2, RoutingStrategy routingStrategy, StreamEnvironment streamEnvironment) {
        this.routingStrategy = routingStrategy;
        this.codec = streamEnvironment.codec();
        this.name = str;
        this.superStream = str2;
        this.environment = streamEnvironment;
        this.superStreamMetadata = new DefaultSuperStreamMetadata(this.superStream, this.environment);
        this.producerBuilder = streamProducerBuilder.duplicate();
        this.producerBuilder.stream((String) null);
        this.producerBuilder.resetRouting();
        this.messageInterceptor = this.environment.observationCollector().isNoop() ? (i, message) -> {
            return message;
        } : (i2, message2) -> {
            return i2 == 0 ? message2 : message2.copy();
        };
    }

    @Override // com.rabbitmq.stream.Producer
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override // com.rabbitmq.stream.Producer
    public long getLastPublishingId() {
        if (this.name == null || this.name.isEmpty()) {
            throw new IllegalStateException("The producer has no name");
        }
        long j = 0;
        boolean z = true;
        for (String str : (List) this.environment.locatorOperation(Utils.namedFunction(client -> {
            return client.partitions(this.superStream);
        }, "Partition lookup for super stream '%s'", this.superStream))) {
            long longValue = ((Long) this.environment.locatorOperation(Utils.namedFunction(client2 -> {
                return Long.valueOf(client2.queryPublisherSequence(this.name, str));
            }, "Publisher sequence query for on partition '%s' of super stream '%s', publisher name '%s'", str, this.superStream, this.name))).longValue();
            if (z) {
                j = longValue;
                z = false;
            } else if (Long.compareUnsigned(j, longValue) > 0) {
                j = longValue;
            }
        }
        return j;
    }

    @Override // com.rabbitmq.stream.Producer
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        if (!canSend()) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10003));
            return;
        }
        List<String> route = this.routingStrategy.route(message, this.superStreamMetadata);
        if (route.isEmpty()) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10005));
            return;
        }
        if (route.size() == 1) {
            producer(route.get(0)).send(message, confirmationHandler);
            return;
        }
        for (int i = 0; i < route.size(); i++) {
            producer(route.get(i));
            producer(route.get(i)).send(this.messageInterceptor.apply(i, message), confirmationHandler);
        }
    }

    private Producer producer(String str) {
        return this.producers.computeIfAbsent(str, str2 -> {
            return this.producerBuilder.duplicate().superStream(null).stream(str2).build();
        });
    }

    private boolean canSend() {
        return !this.closed.get();
    }

    @Override // com.rabbitmq.stream.Producer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            for (Map.Entry<String, Producer> entry : this.producers.entrySet()) {
                try {
                    entry.getValue().close();
                } catch (Exception e) {
                    LOGGER.info("Error while closing producer for partition {} of super stream {}: {}", new Object[]{entry.getKey(), this.superStream, e.getMessage()});
                }
            }
        }
    }
}
