/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.utils.CompositeAddress;

public class MQTTSubscriptionManager {
    private final MQTTSession session;
    private final ConcurrentMap<Long, Integer> consumerQoSLevels;
    private final ConcurrentMap<String, ServerConsumer> consumers;
    private final SimpleString managementFilter;

    public MQTTSubscriptionManager(MQTTSession session) {
        this.session = session;
        this.consumers = new ConcurrentHashMap<String, ServerConsumer>();
        this.consumerQoSLevels = new ConcurrentHashMap<Long, Integer>();
        StringBuilder builder = new StringBuilder();
        builder.append("NOT ((");
        builder.append((CharSequence)FilterConstants.ACTIVEMQ_ADDRESS);
        builder.append(" = '");
        builder.append((CharSequence)session.getServer().getConfiguration().getManagementAddress());
        builder.append("') OR (");
        builder.append((CharSequence)FilterConstants.ACTIVEMQ_ADDRESS);
        builder.append(" = '");
        builder.append((CharSequence)session.getServer().getConfiguration().getManagementNotificationAddress());
        builder.append("'))");
        this.managementFilter = new SimpleString(builder.toString());
    }

    synchronized void start() throws Exception {
        for (MqttTopicSubscription subscription : this.session.getSessionState().getSubscriptions()) {
            String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), this.session.getWildcardConfiguration());
            Queue q = this.createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
            this.createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
        }
    }

    synchronized void stop() throws Exception {
        for (ServerConsumer consumer : this.consumers.values()) {
            consumer.setStarted(false);
            consumer.disconnect();
            consumer.getQueue().removeConsumer((Consumer)consumer);
            consumer.close(false);
        }
    }

    private Queue createQueueForSubscription(String address, int qos) throws Exception {
        SimpleString queue = this.getQueueNameForTopic(address);
        Queue q = this.session.getServer().locateQueue(queue);
        if (q == null) {
            SimpleString sAddress = SimpleString.toSimpleString((String)address);
            BindingQueryResult bindingQueryResult = this.session.getServerSession().executeBindingQuery(sAddress);
            if (!bindingQueryResult.isAutoCreateQueues()) {
                throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
            }
            AddressInfo addressInfo = this.session.getServerSession().getAddress(sAddress);
            if (addressInfo == null) {
                if (!bindingQueryResult.isAutoCreateAddresses()) {
                    throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString((String)address));
                }
                addressInfo = this.session.getServerSession().createAddress(SimpleString.toSimpleString((String)address), RoutingType.MULTICAST, true);
            }
            return this.findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
        }
        return q;
    }

    private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception {
        if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
            return this.session.getServerSession().createQueue(new QueueConfiguration(queue).setAddress(addressInfo.getName()).setFilterString(this.managementFilter).setDurable(Boolean.valueOf(qos >= 0)));
        }
        if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
            if (!bindingQueryResult.getQueueNames().isEmpty()) {
                SimpleString name = null;
                for (SimpleString qName : bindingQueryResult.getQueueNames()) {
                    if (name == null) {
                        name = qName;
                        continue;
                    }
                    if (!qName.equals((Object)addressInfo.getName())) continue;
                    name = qName;
                }
                return this.session.getServer().locateQueue(name);
            }
            try {
                return this.session.getServerSession().createQueue(new QueueConfiguration(addressInfo.getName()).setRoutingType(RoutingType.ANYCAST).setFilterString(this.managementFilter).setDurable(Boolean.valueOf(qos >= 0)));
            }
            catch (ActiveMQQueueExistsException e) {
                return this.session.getServer().locateQueue(addressInfo.getName());
            }
        }
        HashSet<RoutingType> routingTypeSet = new HashSet<RoutingType>();
        routingTypeSet.add(RoutingType.MULTICAST);
        routingTypeSet.add(RoutingType.ANYCAST);
        throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(addressInfo.getRoutingType(), addressInfo.getName().toString(), routingTypeSet);
    }

    private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
        long cid = this.session.getServer().getStorageManager().generateID();
        ServerConsumer consumer = this.session.getServerSession().createConsumer(cid, queue.getName(), null, false, false, Integer.valueOf(-1));
        consumer.setStarted(true);
        this.consumers.put(topic, consumer);
        this.consumerQoSLevels.put(cid, qos);
    }

    private void addSubscription(MqttTopicSubscription subscription) throws Exception {
        String topicName = CompositeAddress.extractAddressName((String)subscription.topicName());
        MqttTopicSubscription s = this.session.getSessionState().getSubscription(topicName);
        int qos = subscription.qualityOfService().value();
        String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topicName, this.session.getWildcardConfiguration());
        this.session.getSessionState().addSubscription(subscription, this.session.getWildcardConfiguration());
        Queue q = this.createQueueForSubscription(coreAddress, qos);
        if (s == null) {
            this.createConsumerForSubscriptionQueue(q, topicName, qos);
        } else {
            this.consumerQoSLevels.put(((ServerConsumer)this.consumers.get(topicName)).getID(), qos);
        }
        this.session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeSubscriptions(List<String> topics) throws Exception {
        MQTTSessionState mQTTSessionState = this.session.getSessionState();
        synchronized (mQTTSessionState) {
            for (String topic : topics) {
                this.removeSubscription(topic);
            }
        }
    }

    private void removeSubscription(String address) throws Exception {
        String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address, this.session.getWildcardConfiguration());
        SimpleString internalQueueName = this.getQueueNameForTopic(internalAddress);
        this.session.getSessionState().removeSubscription(address);
        Queue queue = this.session.getServer().locateQueue(internalQueueName);
        SimpleString sAddress = SimpleString.toSimpleString((String)internalAddress);
        AddressInfo addressInfo = this.session.getServerSession().getAddress(sAddress);
        if (addressInfo != null && addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
            ServerConsumer consumer = (ServerConsumer)this.consumers.get(address);
            this.consumers.remove(address);
            if (consumer != null) {
                consumer.close(false);
                this.consumerQoSLevels.remove(consumer.getID());
            }
        } else {
            Set queueConsumers;
            this.consumers.remove(address);
            if (queue != null && (queueConsumers = (Set)queue.getConsumers()) != null) {
                for (Consumer consumer : queueConsumers) {
                    if (!(consumer instanceof ServerConsumer)) continue;
                    ((ServerConsumer)consumer).close(false);
                    this.consumerQoSLevels.remove(((ServerConsumer)consumer).getID());
                }
            }
        }
        if (queue != null) {
            assert (this.session.getServerSession().executeQueueQuery(internalQueueName).isExists());
            if (queue.isConfigurationManaged()) {
                queue.deleteAllReferences();
            } else {
                this.session.getServerSession().deleteQueue(internalQueueName);
            }
        }
    }

    private SimpleString getQueueNameForTopic(String topic) {
        return new SimpleString(this.session.getSessionState().getClientId() + "." + topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception {
        MQTTSessionState mQTTSessionState = this.session.getSessionState();
        synchronized (mQTTSessionState) {
            int[] qos = new int[subscriptions.size()];
            for (int i = 0; i < subscriptions.size(); ++i) {
                this.addSubscription(subscriptions.get(i));
                qos[i] = subscriptions.get(i).qualityOfService().value();
            }
            return qos;
        }
    }

    Map<Long, Integer> getConsumerQoSLevels() {
        return this.consumerQoSLevels;
    }

    void clean() throws Exception {
        for (MqttTopicSubscription mqttTopicSubscription : this.session.getSessionState().getSubscriptions()) {
            this.removeSubscription(mqttTopicSubscription.topicName());
        }
    }
}

