/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.cloud.extend.mqtt.service;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.extend.mqtt.event.MqttDeliveryCompleteEvent;
import org.noear.solon.cloud.extend.mqtt.service.MqttClientManager;
import org.noear.solon.cloud.extend.mqtt.service.MqttMessageHandler;
import org.noear.solon.cloud.extend.mqtt.service.MqttMessageListenerImpl;
import org.noear.solon.cloud.model.EventObserver;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.core.event.EventBus;
import org.noear.solon.core.util.RunUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttClientManagerImpl
implements MqttClientManager,
MqttCallbackExtended {
    private static final Logger log = LoggerFactory.getLogger(MqttClientManagerImpl.class);
    private static final String PROP_EVENT_clientId = "event.clientId";
    private final String server;
    private final String username;
    private final String password;
    private final CloudEventObserverManger observerManger;
    private final String eventChannelName;
    private final MqttConnectOptions options;
    private String clientId;
    private boolean async = true;
    private final Set<MqttClientManager.ConnectCallback> connectCallbacks = Collections.synchronizedSet(new HashSet());
    private IMqttAsyncClient client;
    private final ReentrantLock SYNC_LOCK = new ReentrantLock();

    public MqttClientManagerImpl(CloudEventObserverManger observerManger, CloudProps cloudProps) {
        this.observerManger = observerManger;
        this.eventChannelName = cloudProps.getEventChannel();
        this.server = cloudProps.getEventServer();
        this.username = cloudProps.getUsername();
        this.password = cloudProps.getPassword();
        this.clientId = cloudProps.getValue(PROP_EVENT_clientId);
        if (Utils.isEmpty((String)this.clientId)) {
            this.clientId = Solon.cfg().appName() + "-" + Utils.guid();
        }
        this.options = new MqttConnectOptions();
        if (Utils.isNotEmpty((String)this.username)) {
            this.options.setUserName(this.username);
        } else {
            this.options.setUserName(Solon.cfg().appName());
        }
        if (Utils.isNotEmpty((String)this.password)) {
            this.options.setPassword(this.password.toCharArray());
        }
        this.options.setWill("client.close", this.clientId.getBytes(StandardCharsets.UTF_8), 1, false);
        this.options.setConnectionTimeout(30);
        this.options.setKeepAliveInterval(20);
        this.options.setServerURIs(new String[]{this.server});
        this.options.setCleanSession(false);
        this.options.setAutomaticReconnect(true);
        this.options.setMaxInflight(128);
        Properties props = cloudProps.getEventClientProps();
        if (props.size() > 0) {
            Utils.injectProperties((Object)this.options, (Properties)props);
        }
        EventBus.publish((Object)this.options);
    }

    public void connectionLost(Throwable cause) {
        log.warn("MQTT connection lost, clientId={}", (Object)this.clientId, (Object)cause);
        if (!this.options.isAutomaticReconnect()) {
            this.client = null;
        }
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        EventObserver eventHandler;
        if (log.isTraceEnabled()) {
            log.trace("MQTT message arrived, clientId={}, messageId={}", (Object)this.clientId, (Object)message.getId());
        }
        if ((eventHandler = this.observerManger.getByTopic(topic)) != null) {
            MqttMessageHandler handler = new MqttMessageHandler(this, this.eventChannelName, (CloudEventHandler)eventHandler, topic, message);
            if (this.getAsync()) {
                RunUtil.parallel((Runnable)handler);
            } else {
                handler.run();
            }
        }
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        if (token.getMessageId() > 0) {
            if (log.isDebugEnabled()) {
                log.debug("MQTT message delivery completed, clientId={}, messageId={}", (Object)this.clientId, (Object)token.getMessageId());
            }
            EventBus.publish((Object)new MqttDeliveryCompleteEvent(this.clientId, token.getMessageId(), (IMqttToken)token));
        }
    }

    public void connectComplete(boolean reconnect, String serverURI) {
        this.connectCallbacks.forEach(callback -> callback.connectComplete(reconnect));
    }

    @Override
    public IMqttAsyncClient getClient() {
        if (this.client != null) {
            return this.client;
        }
        this.SYNC_LOCK.lock();
        try {
            if (this.client == null) {
                this.client = this.createClient();
            }
            IMqttAsyncClient iMqttAsyncClient = this.client;
            return iMqttAsyncClient;
        }
        finally {
            this.SYNC_LOCK.unlock();
        }
    }

    @Override
    public String getClientId() {
        return this.clientId;
    }

    @Override
    public void setAsync(boolean async) {
        this.async = async;
    }

    @Override
    public boolean getAsync() {
        return this.async;
    }

    @Override
    public void addCallback(MqttClientManager.ConnectCallback connectCallback) {
        this.connectCallbacks.add(connectCallback);
    }

    @Override
    public boolean removeCallback(MqttClientManager.ConnectCallback connectCallback) {
        return this.connectCallbacks.remove(connectCallback);
    }

    private IMqttAsyncClient createClient() {
        try {
            this.client = new MqttAsyncClient(this.server, this.clientId, (MqttClientPersistence)new MemoryPersistence());
            this.client.setManualAcks(true);
            this.client.setCallback((MqttCallback)this);
            long waitConnectionTimeout = this.options.getConnectionTimeout() * 1000;
            this.client.connect(this.options).waitForCompletion(waitConnectionTimeout);
            this.subscribe();
            return this.client;
        }
        catch (MqttException e) {
            throw new IllegalStateException(e);
        }
    }

    private void subscribe() throws MqttException {
        if (this.observerManger.topicSize() < 1) {
            return;
        }
        String[] topicAry = this.observerManger.topicAll().toArray(new String[0]);
        int[] topicQos = new int[topicAry.length];
        IMqttMessageListener[] topicListener = new IMqttMessageListener[topicAry.length];
        int len = topicQos.length;
        for (int i = 0; i < len; ++i) {
            EventObserver eventObserver = this.observerManger.getByTopic(topicAry[i]);
            topicQos[i] = eventObserver.getQos();
            topicListener[i] = new MqttMessageListenerImpl(this, this.eventChannelName, (CloudEventHandler)eventObserver);
        }
        IMqttToken token = this.getClient().subscribe(topicAry, topicQos, topicListener);
        long waitConnectionTimeout = this.options.getConnectionTimeout() * 1000;
        token.waitForCompletion(waitConnectionTimeout);
    }
}

