package io.joynr.messaging.mqtt.hivemq.client;

import com.hivemq.client.mqtt.MqttClientState;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.exceptions.MqttClientStateException;
import com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserPropertiesBuilder;
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscription;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import io.joynr.exceptions.JoynrDelayMessageException;
import io.joynr.exceptions.JoynrMessageExpiredException;
import io.joynr.exceptions.JoynrMessageNotSentException;
import io.joynr.messaging.FailureAction;
import io.joynr.messaging.SuccessAction;
import io.joynr.messaging.mqtt.IMqttMessagingSkeleton;
import io.joynr.messaging.mqtt.JoynrMqttClient;
import io.joynr.statusmetrics.ConnectionStatusMetricsImpl;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/joynr/messaging/mqtt/hivemq/client/HivemqMqttClient.class */
public class HivemqMqttClient implements JoynrMqttClient {
    private static final Logger logger;
    private static final long NOT_CONNECTED_RETRY_INTERVAL_MS = 5000;
    private final Mqtt5RxClient client;
    private final Mqtt5ClientConfig clientConfig;
    private final boolean cleanSession;
    private final int keepAliveTimeSeconds;
    private final int connectionTimeoutSec;
    private final int reconnectDelayMs;
    private final int receiveMaximum;
    private final boolean isReceiver;
    private final boolean isSender;
    private final String clientInformation;
    private IMqttMessagingSkeleton messagingSkeleton;
    private ConnectionStatusMetricsImpl connectionStatusMetrics;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int maxMsgSizeBytes = 268435460;
    private AtomicBoolean shuttingDown = new AtomicBoolean(true);
    private boolean connected = false;
    private HashMap<String, Disposable> subscriptionDisposables = new HashMap<>();
    private AtomicLong obsoleteUnsubscribeDisposableCount = new AtomicLong(0);
    private List<Disposable> unsubscribeDisposables = new ArrayList();
    private Map<String, Mqtt5Subscription> subscriptions = new HashMap();
    private Disposable publishesDisposable = null;

    public HivemqMqttClient(Mqtt5RxClient mqtt5RxClient, int i, boolean z, int i2, int i3, int i4, boolean z2, boolean z3, String str, ConnectionStatusMetricsImpl connectionStatusMetricsImpl) {
        this.client = mqtt5RxClient;
        this.clientConfig = mqtt5RxClient.getConfig();
        this.keepAliveTimeSeconds = i;
        this.cleanSession = z;
        this.connectionTimeoutSec = i2;
        this.reconnectDelayMs = i3;
        this.receiveMaximum = i4;
        this.isReceiver = z2;
        this.isSender = z3;
        this.clientInformation = createClientInformationString(str);
        this.connectionStatusMetrics = connectionStatusMetricsImpl;
    }

    protected void registerPublishCallback() {
        if (this.isReceiver && this.publishesDisposable == null) {
            this.publishesDisposable = this.client.publishes(MqttGlobalPublishFilter.ALL, true).subscribe(this::handleIncomingMessage, th -> {
                if (this.cleanSession || !(th instanceof MqttSessionExpiredException)) {
                    logger.error("{}: Error encountered in publish callback, trying to resubscribe.", this.clientInformation, th);
                } else {
                    logger.warn("{}: MqttSessionExpiredException encountered in publish callback, trying to resubscribe.", this.clientInformation, th);
                }
                synchronized (this) {
                    if (this.publishesDisposable != null) {
                        this.publishesDisposable.dispose();
                        this.publishesDisposable = null;
                    }
                    registerPublishCallback();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getClientInformationString() {
        return this.clientInformation;
    }

    private String createClientInformationString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("(clientHash=");
        sb.append(Integer.toHexString(this.client.hashCode()));
        sb.append(", GBID=");
        sb.append(str);
        sb.append(", ");
        if (this.isReceiver && this.isSender) {
            sb.append("bidirectional");
        } else if (this.isReceiver) {
            sb.append("receiver");
        } else {
            sb.append("sender");
        }
        sb.append(")");
        return sb.toString();
    }

    public synchronized void connect() {
        if (this.shuttingDown.get()) {
            logger.error("{}: Client not started.", this.clientInformation);
            return;
        }
        if (this.client.getConfig().getState().isConnected()) {
            logger.info("{}: MQTT client already connected - skipping.", this.clientInformation);
            return;
        }
        while (!this.client.getConfig().getState().isConnected()) {
            logger.info("{}: Attempting to connect client, clean session={} ...", this.clientInformation, Boolean.valueOf(this.cleanSession));
            Mqtt5Connect build = ((Mqtt5ConnectBuilder) Mqtt5Connect.builder().restrictions().receiveMaximum(this.receiveMaximum).applyRestrictions()).cleanStart(this.cleanSession).keepAlive(this.keepAliveTimeSeconds).noSessionExpiry().build();
            try {
                this.connectionStatusMetrics.increaseConnectionAttempts();
                this.client.connect(build).timeout(this.connectionTimeoutSec, TimeUnit.SECONDS).doOnSuccess(mqtt5ConnAck -> {
                    this.maxMsgSizeBytes = mqtt5ConnAck.getRestrictions().getMaximumPacketSize();
                    logger.info("{}: MQTT client connected: {}, maxMsgSizeBytes = {}.", new Object[]{this.clientInformation, mqtt5ConnAck, Integer.valueOf(this.maxMsgSizeBytes)});
                }).blockingGet();
                this.connected = true;
            } catch (Exception e) {
                logger.error("{}: Exception encountered while connecting MQTT client.", this.clientInformation, e);
                while (true) {
                    try {
                        logger.debug("{}: Waiting to reconnect, state: {}.", this.clientInformation, this.client.getConfig().getState());
                        wait(this.reconnectDelayMs);
                        if (this.client.getConfig().getState() != MqttClientState.CONNECTING && this.client.getConfig().getState() != MqttClientState.CONNECTING_RECONNECT && this.client.getConfig().getState() != MqttClientState.DISCONNECTED_RECONNECT) {
                            break;
                        }
                    } catch (InterruptedException e2) {
                        logger.error("{}: Exception while waiting to reconnect.", this.clientInformation, e2);
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                logger.debug("{}: Leaving reconnect loop, state: {}.", this.clientInformation, this.client.getConfig().getState());
            }
        }
    }

    public synchronized void disconnect() {
        if (this.connected) {
            this.connected = false;
            try {
                logger.info("{}: Attempting to disconnect.", this.clientInformation);
                ((Completable) this.client.disconnectWith().noSessionExpiry().applyDisconnect()).doOnComplete(() -> {
                    logger.info("{}: Disconnected.", this.clientInformation);
                }).onErrorComplete(th -> {
                    logger.error("{}: Error encountered from disconnect.", this.clientInformation, th);
                    return true;
                }).blockingAwait(NOT_CONNECTED_RETRY_INTERVAL_MS, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                logger.error("{}: Exception thrown on disconnect.", this.clientInformation, e);
            }
        }
    }

    public synchronized void start() {
        if (!this.shuttingDown.getAndSet(false)) {
            logger.warn("{}: Client already started.", this.clientInformation);
            return;
        }
        logger.info("{}: Initializing HiveMQ MQTT client for address {}.", this.clientInformation, this.client.getConfig().getServerAddress());
        if (!$assertionsDisabled && this.isReceiver && this.messagingSkeleton == null) {
            throw new AssertionError();
        }
        registerPublishCallback();
    }

    public void setMessageListener(IMqttMessagingSkeleton iMqttMessagingSkeleton) {
        if (this.isReceiver && this.messagingSkeleton == null) {
            this.messagingSkeleton = iMqttMessagingSkeleton;
        }
    }

    public synchronized void shutdown() {
        if (this.shuttingDown.getAndSet(true)) {
            logger.warn("{}: Client already shutdown.", this.clientInformation);
            return;
        }
        disconnect();
        logger.debug("{}: Shutdown.", this.clientInformation);
        if (this.publishesDisposable != null) {
            this.publishesDisposable.dispose();
            this.publishesDisposable = null;
        }
        synchronized (this.subscriptions) {
            disposeSubscriptions();
            Iterator<Disposable> it = this.unsubscribeDisposables.iterator();
            while (it.hasNext()) {
                it.next().dispose();
            }
            this.obsoleteUnsubscribeDisposableCount.set(0L);
        }
    }

    private void disposeSubscriptions() {
        Iterator<Disposable> it = this.subscriptionDisposables.values().iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
        this.subscriptionDisposables.clear();
    }

    public void publishMessage(String str, byte[] bArr, Map<String, String> map, int i, long j, SuccessAction successAction, FailureAction failureAction) {
        if (!$assertionsDisabled && !this.isSender) {
            throw new AssertionError();
        }
        if (map == null) {
            throw new JoynrMessageNotSentException("prefixedCustomHeaders must not be null");
        }
        if (this.maxMsgSizeBytes != 0 && bArr.length > this.maxMsgSizeBytes) {
            throw new JoynrMessageNotSentException("Publish failed: maximum allowed message size of " + this.maxMsgSizeBytes + " bytes exceeded, actual size is " + bArr.length + " bytes");
        }
        if (!this.clientConfig.getState().isConnected()) {
            failureAction.execute(new JoynrDelayMessageException(NOT_CONNECTED_RETRY_INTERVAL_MS, "Publish failed: Mqtt client not connected."));
            return;
        }
        Mqtt5UserPropertiesBuilder builder = Mqtt5UserProperties.builder();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (entry.getKey().isEmpty() || entry.getValue().isEmpty()) {
                logger.trace("{}: Did not add MQTT empty user property {} / {}", new Object[]{this.clientInformation, entry.getKey(), entry.getValue()});
            } else {
                builder.add(entry.getKey(), entry.getValue());
            }
        }
        Mqtt5Publish build = Mqtt5Publish.builder().topic(str).qos(safeParseQos(i)).payload(bArr).messageExpiryInterval(j).userProperties(builder.build()).build();
        logger.debug("{}: Publishing to topic: {}, size: {}, qos: {}", new Object[]{this.clientInformation, str, Integer.valueOf(bArr.length), Integer.valueOf(i)});
        this.client.toAsync().publish(build).whenComplete((mqtt5PublishResult, th) -> {
            if (th != null) {
                logger.error("{}: Publishing to topic: {}, size: {}, qos: {} failed with exception.", new Object[]{this.clientInformation, str, Integer.valueOf(bArr.length), Integer.valueOf(i), th});
                if (th instanceof MqttClientStateException) {
                    failureAction.execute(new JoynrDelayMessageException(NOT_CONNECTED_RETRY_INTERVAL_MS, "Publish failed: " + th.toString()));
                    return;
                } else {
                    failureAction.execute(new JoynrDelayMessageException("Publish failed: " + th.toString()));
                    return;
                }
            }
            if (mqtt5PublishResult.getError().isPresent()) {
                logger.error("{}: Publishing to topic: {}, size: {}, qos: {} failed with error result: {}", new Object[]{this.clientInformation, str, Integer.valueOf(bArr.length), Integer.valueOf(i), mqtt5PublishResult, mqtt5PublishResult.getError().get()});
                failureAction.execute(new JoynrDelayMessageException("Publish failed: " + ((Throwable) mqtt5PublishResult.getError().get()).toString()));
                return;
            }
            this.connectionStatusMetrics.increaseSentMessages();
            if (logger.isTraceEnabled()) {
                logger.trace("{}: Publishing to topic: {}, size: {}, qos: {} succeeded: {}", new Object[]{this.clientInformation, str, Integer.valueOf(bArr.length), Integer.valueOf(i), mqtt5PublishResult});
            } else {
                logger.debug("{}: Publishing to topic: {}, size: {}, qos: {} succeeded.", new Object[]{this.clientInformation, str, Integer.valueOf(bArr.length), Integer.valueOf(i)});
            }
            successAction.execute();
        });
    }

    private MqttQos safeParseQos(int i) {
        MqttQos fromCode = MqttQos.fromCode(i);
        if (fromCode == null) {
            fromCode = MqttQos.AT_LEAST_ONCE;
            logger.error("{}: Got invalid QoS level {} for publish, using default: {}", new Object[]{this.clientInformation, Integer.valueOf(i), Integer.valueOf(fromCode.getCode())});
        }
        return fromCode;
    }

    public void subscribe(String str) {
        if (!$assertionsDisabled && !this.isReceiver) {
            throw new AssertionError();
        }
        synchronized (this.subscriptions) {
            logger.info("{}: Subscribing to topic: {}", this.clientInformation, str);
            Mqtt5Subscription computeIfAbsent = this.subscriptions.computeIfAbsent(str, str2 -> {
                return Mqtt5Subscription.builder().topicFilter(str2).qos(MqttQos.AT_LEAST_ONCE).build();
            });
            if (this.shuttingDown.get()) {
                return;
            }
            doSubscribe(computeIfAbsent, str);
        }
    }

    private void doSubscribe(Mqtt5Subscription mqtt5Subscription, String str) {
        this.subscriptionDisposables.put(str, this.client.subscribeStream(Mqtt5Subscribe.builder().addSubscription(mqtt5Subscription).build()).doOnSingle(mqtt5SubAck -> {
            logger.debug("{}: Subscribed to topic: {}, result: {}", new Object[]{this.clientInformation, mqtt5Subscription, mqtt5SubAck});
        }).subscribe(mqtt5Publish -> {
        }, th -> {
            logger.error("{}: Error encountered for subscription {}.", new Object[]{this.clientInformation, mqtt5Subscription, th});
        }));
    }

    public void resubscribe() {
        logger.debug("{}: Resubscribe triggered.", this.clientInformation);
        synchronized (this.subscriptions) {
            disposeSubscriptions();
            this.subscriptions.forEach((str, mqtt5Subscription) -> {
                logger.info("{}: Resubscribing to topic: {}", this.clientInformation, str);
                doSubscribe(mqtt5Subscription, str);
            });
        }
    }

    public void unsubscribe(String str) {
        if (!$assertionsDisabled && !this.isReceiver) {
            throw new AssertionError();
        }
        Mqtt5Unsubscribe build = Mqtt5Unsubscribe.builder().addTopicFilter(str).build();
        synchronized (this.subscriptions) {
            logger.info("{}: Unsubscribing from topic: {}", this.clientInformation, str);
            this.subscriptions.remove(str);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.unsubscribeDisposables.add(this.client.unsubscribe(build).subscribe(mqtt5UnsubAck -> {
                logger.debug("{}: Unsubscribed from topic: {}", this.clientInformation, str);
                if (atomicBoolean.compareAndSet(false, true)) {
                    this.obsoleteUnsubscribeDisposableCount.getAndIncrement();
                }
            }, th -> {
                logger.error("{}: Unable to unsubscribe from topic: {}", new Object[]{this.clientInformation, str, th});
                if (atomicBoolean.compareAndSet(false, true)) {
                    this.obsoleteUnsubscribeDisposableCount.getAndIncrement();
                }
            }));
            Disposable remove = this.subscriptionDisposables.remove(str);
            if (remove != null) {
                remove.dispose();
            }
            while (this.obsoleteUnsubscribeDisposableCount.get() > 0) {
                Disposable remove2 = this.unsubscribeDisposables.remove(0);
                if (remove2 != null) {
                    remove2.dispose();
                }
                this.obsoleteUnsubscribeDisposableCount.decrementAndGet();
            }
        }
    }

    public synchronized boolean isShutdown() {
        return this.shuttingDown.get();
    }

    private void handleIncomingMessage(Mqtt5Publish mqtt5Publish) {
        if (logger.isDebugEnabled()) {
            ByteBuffer byteBuffer = (ByteBuffer) mqtt5Publish.getPayload().orElse(null);
            Logger logger2 = logger;
            Object[] objArr = new Object[6];
            objArr[0] = this.clientInformation;
            objArr[1] = mqtt5Publish.getTopic();
            objArr[2] = Integer.valueOf(byteBuffer == null ? 0 : byteBuffer.remaining());
            objArr[3] = mqtt5Publish.getQos();
            objArr[4] = Boolean.valueOf(mqtt5Publish.isRetain());
            objArr[5] = Long.valueOf(mqtt5Publish.getMessageExpiryInterval().orElse(0L));
            logger2.debug("{}: Received publication: topic: {}, size: {}, qos: {}, retain: {}, expiryInterval: {}.", objArr);
        }
        this.connectionStatusMetrics.increaseReceivedMessages();
        List<Mqtt5UserProperty> asList = mqtt5Publish.getUserProperties().asList();
        HashMap hashMap = new HashMap();
        for (Mqtt5UserProperty mqtt5UserProperty : asList) {
            if (mqtt5UserProperty.getName().toString().startsWith("c-")) {
                hashMap.put(mqtt5UserProperty.getName().toString(), mqtt5UserProperty.getValue().toString());
            }
        }
        this.messagingSkeleton.transmit(mqtt5Publish, hashMap, th -> {
            if (th instanceof JoynrMessageExpiredException) {
                logger.warn("{}: Unable to handle incoming {}", new Object[]{this.clientInformation, mqtt5Publish, th});
            } else {
                logger.error("{}: Unable to handle incoming {}", new Object[]{this.clientInformation, mqtt5Publish, th});
            }
        });
    }

    Mqtt5RxClient getClient() {
        return this.client;
    }

    static {
        $assertionsDisabled = !HivemqMqttClient.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(HivemqMqttClient.class);
    }
}
