package com.mulesoft.connectors.mqtt3.internal.source;

import com.mulesoft.connectors.mqtt3.api.MQTT3MessageAttributes;
import com.mulesoft.connectors.mqtt3.api.Topic;
import com.mulesoft.connectors.mqtt3.internal.connection.MQTT3Connection;
import com.mulesoft.connectors.mqtt3.internal.exceptions.MQTT3ConnectionExceptionResolver;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3ForwardingMessageHandler;
import com.mulesoft.connectors.mqtt3.internal.routing.MQTT3Message;
import java.util.List;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.source.ClusterSupport;
import org.mule.runtime.extension.api.annotation.source.SourceClusterSupport;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClusterSupport(SourceClusterSupport.DEFAULT_PRIMARY_NODE_ONLY)
@MediaType("*/*")
@DisplayName("On New Message")
@Alias("listener")
/* loaded from: input_file:com/mulesoft/connectors/mqtt3/internal/source/MQTT3MessageListener.class */
public class MQTT3MessageListener extends Source<byte[], MQTT3MessageAttributes> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQTT3MessageListener.class);

    @Parameter
    private List<Topic> topics;

    @Connection
    private ConnectionProvider<MQTT3Connection> connectionProvider;
    private MQTT3Connection connection;
    private MQTT3ForwardingMessageHandler messageHandler;

    public void onStart(SourceCallback<byte[], MQTT3MessageAttributes> sourceCallback) {
        try {
            LOGGER.debug("Starting MQTT3 Source");
            this.connection = (MQTT3Connection) this.connectionProvider.connect();
            this.connection.setConnectionLostHandler(th -> {
                sourceCallback.onConnectionException(new ConnectionException(th, this.connection));
            });
            this.messageHandler = new MQTT3ForwardingMessageHandler(mQTT3Message -> {
                sourceCallback.handle(buildResult(mQTT3Message));
            });
            this.connection.subscribeListenerToTopics(this.topics, this.messageHandler);
        } catch (Throwable th2) {
            if (ExceptionUtils.extractConnectionException(th2).isPresent()) {
                sourceCallback.onConnectionException((ConnectionException) ExceptionUtils.extractConnectionException(th2).get());
            } else {
                if (!MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(th2, this.connection).isPresent()) {
                    throw new MuleRuntimeException(th2);
                }
                sourceCallback.onConnectionException(MQTT3ConnectionExceptionResolver.resolveMQTT3ConnectionException(th2, this.connection).get());
            }
        }
    }

    public void onStop() {
        if (this.connection == null) {
            return;
        }
        try {
            this.connection.unsubscribeListenerFromTopics(this.topics, this.messageHandler);
            this.connectionProvider.disconnect(this.connection);
        } catch (Exception e) {
            LOGGER.error("Error occurred unsubscribing listeners from topics:" + this.topics.toString() + " " + e.getMessage(), e);
        }
    }

    private Result<byte[], MQTT3MessageAttributes> buildResult(MQTT3Message mQTT3Message) {
        return Result.builder().attributes(MQTT3MessageAttributes.Builder.newInstance().withTopic(mQTT3Message.getTopic()).withMessageId(mQTT3Message.getId()).withDuplicate(mQTT3Message.getIsDuplicate()).withRetained(mQTT3Message.getIsRetained()).withQoS(mQTT3Message.getQoS()).build()).output(mQTT3Message.getContent()).build();
    }
}
