package io.helidon.messaging;

import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/helidon/messaging/Messaging.class */
public interface Messaging {

    /* loaded from: input_file:io/helidon/messaging/Messaging$Builder.class */
    public static final class Builder implements io.helidon.common.Builder<Builder, Messaging> {
        private static final Logger LOGGER = Logger.getLogger(Messaging.class.getName());
        private final MessagingImpl messaging = new MessagingImpl();

        private Builder() {
        }

        public Builder config(Config config) {
            this.messaging.setConfig(config);
            return this;
        }

        public Builder connector(ConnectorFactory connectorFactory) {
            if (connectorFactory instanceof IncomingConnectorFactory) {
                this.messaging.addIncomingConnector((IncomingConnectorFactory) connectorFactory);
            }
            if (connectorFactory instanceof OutgoingConnectorFactory) {
                this.messaging.addOutgoingConnector((OutgoingConnectorFactory) connectorFactory);
            }
            return this;
        }

        public <PAYLOAD> Builder emitter(Emitter<PAYLOAD> emitter) {
            this.messaging.addEmitter(emitter);
            for (Channel<PAYLOAD> channel : emitter.channels()) {
                this.messaging.registerChannel(channel);
                channel.setPublisher(emitter);
            }
            return this;
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, PublisherBuilder<? extends Message<? extends PAYLOAD>> publisherBuilder) {
            return publisher(channel, publisherBuilder.buildRs());
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Publisher<? extends PAYLOAD> publisher, Function<? super PAYLOAD, ? extends Message<? extends PAYLOAD>> function) {
            if (function == null) {
                function = Message::of;
            }
            return publisher(channel, ReactiveStreams.fromPublisher(publisher).map(function).buildRs());
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Flow.Publisher<? extends PAYLOAD> publisher, Function<? super PAYLOAD, ? extends Message<? extends PAYLOAD>> function) {
            if (function == null) {
                function = Message::of;
            }
            return publisher(channel, FlowAdapters.toPublisher(publisher), function);
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Flow.Publisher<? extends Message<? extends PAYLOAD>> publisher) {
            return publisher(channel, FlowAdapters.toPublisher(publisher));
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Publisher<? extends Message<? extends PAYLOAD>> publisher) {
            this.messaging.registerChannel(channel);
            channel.setPublisher(publisher);
            return this;
        }

        public <PAYLOAD> Builder listener(Channel<PAYLOAD> channel, Consumer<? super PAYLOAD> consumer) {
            this.messaging.registerChannel(channel);
            channel.setSubscriber(unwrapProcessorBuilder().peek(consumer).onError(th -> {
                LOGGER.log(Level.SEVERE, "Error detected in channel " + channel.name(), th);
            }).ignore().build());
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Flow.Subscriber<? extends Message<? extends PAYLOAD>> subscriber) {
            subscriber(channel, FlowAdapters.toSubscriber(subscriber));
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Consumer<Multi<? extends Message<? extends PAYLOAD>>> consumer) {
            Processor buildRs = ReactiveStreams.builder().buildRs();
            consumer.accept(Multi.create(FlowAdapters.toFlowPublisher(buildRs)));
            subscriber((Channel) channel, (Subscriber) buildRs);
            return this;
        }

        public <PAYLOAD, RESULT> Builder subscriber(Channel<PAYLOAD> channel, SubscriberBuilder<? extends Message<? extends PAYLOAD>, RESULT> subscriberBuilder) {
            subscriber((Channel) channel, (Subscriber) subscriberBuilder.build());
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Subscriber<? extends Message<? extends PAYLOAD>> subscriber) {
            this.messaging.registerChannel(channel);
            channel.setSubscriber(subscriber);
            return this;
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> channel, Channel<RESULT> channel2, Processor<? extends Message<? extends PAYLOAD>, ? extends Message<? extends RESULT>> processor) {
            this.messaging.registerChannel(channel);
            this.messaging.registerChannel(channel2);
            channel.setSubscriber(processor);
            channel2.setPublisher(processor);
            return this;
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> channel, Channel<RESULT> channel2, ProcessorBuilder<? extends Message<? extends PAYLOAD>, ? extends Message<? extends RESULT>> processorBuilder) {
            return processor(channel, channel2, processorBuilder.buildRs());
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> channel, Channel<RESULT> channel2, Function<? super PAYLOAD, ? extends RESULT> function) {
            return processor(channel, channel2, unwrapProcessorBuilder().map(function).via(wrapProcessorBuilder()).buildRs());
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public Messaging m6build() {
            if (this.messaging.getConfig() == null) {
                this.messaging.setConfig(Config.empty());
            }
            return this.messaging;
        }

        private static <PAYLOAD> ProcessorBuilder<? super PAYLOAD, Message<? extends PAYLOAD>> wrapProcessorBuilder() {
            return ReactiveStreams.builder().map(Message::of);
        }

        private static <PAYLOAD> ProcessorBuilder<? extends Message<? extends PAYLOAD>, ? extends PAYLOAD> unwrapProcessorBuilder() {
            return ReactiveStreams.builder().peek((v0) -> {
                v0.ack();
            }).map((v0) -> {
                return v0.getPayload();
            });
        }
    }

    Messaging start();

    void stop();

    static Builder builder() {
        return new Builder();
    }
}
