package com.mulesoft.connectors.mqtt.internal;

import com.mulesoft.connectors.mqtt.api.MQTTAttributes;
import com.mulesoft.connectors.mqtt.api.Topic;
import java.util.List;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MediaType("text/plain")
@DisplayName("On New Message")
@Alias("listener")
/* loaded from: input_file:com/mulesoft/connectors/mqtt/internal/MQTTListener.class */
public class MQTTListener extends Source<byte[], MQTTAttributes> {

    @Connection
    private ConnectionProvider<MQTTSession> clientProvider;

    @Parameter
    private List<Topic> topics;
    private IMqttToken subscription;
    private MqttAsyncClient client;
    private MQTTSession session;
    private Consumer<Throwable> onDisconnectCallback;
    private final Logger LOGGER = LoggerFactory.getLogger(MQTTListener.class);
    private LazyValue<String[]> calculatedTopics = new LazyValue<>(() -> {
        return (String[]) this.topics.stream().map((v0) -> {
            return v0.getTopicFilter();
        }).toArray(i -> {
            return new String[i];
        });
    });

    public void onStart(SourceCallback<byte[], MQTTAttributes> sourceCallback) throws MuleException {
        if (this.LOGGER.isDebugEnabled()) {
            this.LOGGER.debug("Initializing Subscription to: " + this.topics);
        }
        this.session = (MQTTSession) this.clientProvider.connect();
        this.client = this.session.getClient();
        this.onDisconnectCallback = th -> {
            sourceCallback.onConnectionException(new ConnectionException(th, this.client));
        };
        try {
            this.subscription = this.client.subscribe(getTopics(), this.topics.stream().map((v0) -> {
                return v0.getQos();
            }).map((v0) -> {
                return v0.getValue();
            }).mapToInt((v0) -> {
                return v0.intValue();
            }).toArray(), (IMqttMessageListener[]) this.topics.stream().map(topic -> {
                return getMessageListener(sourceCallback);
            }).toArray(i -> {
                return new IMqttMessageListener[i];
            }));
            this.session.registerOnDisconnectCallback(this.onDisconnectCallback);
        } catch (MqttException e) {
            if (!ConnectionUtills.isConnectionException(e)) {
                sourceCallback.onConnectionException(new ConnectionException(e));
            } else {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                }
                sourceCallback.onConnectionException(new ConnectionException(e));
            }
        }
    }

    private String[] getTopics() {
        return (String[]) this.calculatedTopics.get();
    }

    private IMqttMessageListener getMessageListener(SourceCallback<byte[], MQTTAttributes> sourceCallback) {
        return (str, mqttMessage) -> {
            sourceCallback.handle(Result.builder().output(mqttMessage.getPayload()).attributes(new MQTTAttributes(str, mqttMessage)).build());
        };
    }

    public void onStop() {
        if (this.subscription != null) {
            try {
                this.subscription.getClient().unsubscribe(getTopics());
            } catch (MqttException e) {
                this.LOGGER.debug("Error when trying to unsubscribe", e);
            }
        }
        if (this.session != null) {
            this.session.removeCallback(this.onDisconnectCallback);
        }
    }
}
