/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import java.io.UnsupportedEncodingException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;

public class MQTTPublishManager {
    private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
    private SimpleString managementAddress;
    private ServerConsumer managementConsumer;
    private MQTTSession session;
    private MQTTLogger log = MQTTLogger.LOGGER;
    private final Object lock = new Object();
    private MQTTSessionState state;
    private MQTTSessionState.OutboundStore outboundStore;

    public MQTTPublishManager(MQTTSession session) {
        this.session = session;
    }

    synchronized void start() throws Exception {
        this.state = this.session.getSessionState();
        this.outboundStore = this.state.getOutboundStore();
        this.createManagementAddress();
        this.createManagementQueue();
        this.createManagementConsumer();
    }

    synchronized void stop() throws Exception {
        if (this.managementConsumer != null) {
            this.managementConsumer.removeItself();
            this.managementConsumer.setStarted(false);
            this.managementConsumer.close(false);
        }
    }

    void clean() throws Exception {
        this.createManagementAddress();
        Queue queue = this.session.getServer().locateQueue(this.managementAddress);
        if (queue != null) {
            queue.deleteQueue();
        }
    }

    private void createManagementConsumer() throws Exception {
        long consumerId = this.session.getServer().getStorageManager().generateID();
        this.managementConsumer = this.session.getServerSession().createConsumer(consumerId, this.managementAddress, null, false, false, Integer.valueOf(-1));
        this.managementConsumer.setStarted(true);
    }

    private void createManagementAddress() {
        this.managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + this.session.getSessionState().getClientId());
    }

    private void createManagementQueue() throws Exception {
        Queue q = this.session.getServer().locateQueue(this.managementAddress);
        if (q == null) {
            this.session.getServerSession().createQueue(this.managementAddress, this.managementAddress, null, false, true);
        }
    }

    boolean isManagementConsumer(ServerConsumer consumer) {
        return consumer == this.managementConsumer;
    }

    protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
        if (this.isManagementConsumer(consumer)) {
            this.sendPubRelMessage(message);
        } else {
            int qos = this.decideQoS(message, consumer);
            if (qos == 0) {
                this.sendServerMessage((int)message.getMessageID(), (ServerMessageImpl)message, deliveryCount, qos);
                this.session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
            } else {
                int mqttid = this.outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
                this.outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
                this.sendServerMessage(mqttid, (ServerMessageImpl)message, deliveryCount, qos);
            }
        }
    }

    void handleMessage(int messageId, String topic, int qos, ByteBuf payload, boolean retain) throws Exception {
        this.sendInternal(messageId, topic, qos, payload, retain, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(this.session, topic, retain, qos, payload);
            if (qos > 0) {
                serverMessage.setDurable(true);
            }
            if (qos < 2 || !this.state.getPubRec().contains(messageId)) {
                if (qos == 2 && !internal) {
                    this.state.getPubRec().add(messageId);
                }
                Transaction tx = this.session.getServerSession().newTransaction();
                try {
                    if (internal) {
                        this.session.getServer().getPostOffice().route(serverMessage, null, tx, true);
                    } else {
                        this.session.getServerSession().send(tx, serverMessage, true, false);
                    }
                    if (retain) {
                        boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
                        this.session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset, tx);
                    }
                    tx.commit();
                }
                catch (Throwable t) {
                    tx.rollback();
                    throw t;
                }
                this.createMessageAck(messageId, qos, internal);
            }
        }
    }

    void sendPubRelMessage(ServerMessage message) {
        int messageId = message.getIntProperty("mqtt.message.id");
        this.session.getProtocolHandler().sendPubRel(messageId);
    }

    void handlePubRec(int messageId) throws Exception {
        try {
            Pair<Long, Long> ref = this.outboundStore.publishReceived(messageId);
            if (ref != null) {
                ServerMessage m = MQTTUtil.createPubRelMessage(this.session, this.managementAddress, messageId);
                this.session.getServerSession().send(m, true);
                this.session.getServerSession().acknowledge(((Long)ref.getB()).longValue(), ((Long)ref.getA()).longValue());
            } else {
                this.session.getProtocolHandler().sendPubRel(messageId);
            }
        }
        catch (ActiveMQIllegalStateException e) {
            this.log.warn("MQTT Client(" + this.session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
        }
    }

    void handlePubComp(int messageId) throws Exception {
        Pair<Long, Long> ref = this.session.getState().getOutboundStore().publishComplete(messageId);
        if (ref != null) {
            this.session.getServerSession().acknowledge(((Long)ref.getB()).longValue(), ((Long)ref.getA()).longValue());
        }
    }

    private void createMessageAck(final int messageId, final int qos, final boolean internal) {
        this.session.getServer().getStorageManager().afterCompleteOperations(new IOCallback(){

            public void done() {
                if (!internal) {
                    if (qos == 1) {
                        MQTTPublishManager.this.session.getProtocolHandler().sendPubAck(messageId);
                    } else if (qos == 2) {
                        MQTTPublishManager.this.session.getProtocolHandler().sendPubRec(messageId);
                    }
                }
            }

            public void onError(int errorCode, String errorMessage) {
                MQTTPublishManager.this.log.error("Pub Sync Failed");
            }
        });
    }

    void handlePubRel(int messageId) {
        this.state.getPubRec().remove(messageId);
        this.session.getProtocolHandler().sendPubComp(messageId);
        this.state.removeMessageRef(messageId);
    }

    void handlePubAck(int messageId) throws Exception {
        try {
            Pair<Long, Long> ref = this.outboundStore.publishAckd(messageId);
            if (ref != null) {
                this.session.getServerSession().acknowledge(((Long)ref.getB()).longValue(), ((Long)ref.getA()).longValue());
            }
        }
        catch (ActiveMQIllegalStateException e) {
            this.log.warn("MQTT Client(" + this.session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
        }
    }

    private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) {
        ByteBuf payload;
        String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString());
        switch (message.getType()) {
            case 3: {
                try {
                    SimpleString text = message.getBodyBuffer().readNullableSimpleString();
                    byte[] stringPayload = text.toString().getBytes("UTF-8");
                    payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length);
                    payload.writeBytes(stringPayload);
                    break;
                }
                catch (UnsupportedEncodingException e) {
                    this.log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
                }
            }
            default: {
                ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate();
                payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf();
            }
        }
        this.session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
    }

    private int decideQoS(ServerMessage message, ServerConsumer consumer) {
        int subscriptionQoS = this.session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
        int qos = 2;
        if (message.containsProperty("mqtt.qos.level")) {
            qos = message.getIntProperty("mqtt.qos.level");
        }
        return subscriptionQoS < qos ? subscriptionQoS : qos;
    }
}

