/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.integration.vertx;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.integration.vertx.ActiveMQVertxLogger;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.ReplyFailure;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;

public class OutgoingVertxEventHandler
implements Consumer,
ConnectorService {
    private final String connectorName;
    private final String queueName;
    private final int port;
    private final String host;
    private final int quorumSize;
    private final String haGroup;
    private final String vertxAddress;
    private final boolean publish;
    private final PostOffice postOffice;
    private Queue queue = null;
    private Filter filter = null;
    private EventBus eventBus;
    private PlatformManager platformManager;
    private boolean isStarted = false;

    public OutgoingVertxEventHandler(String connectorName, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) {
        this.connectorName = connectorName;
        this.queueName = ConfigurationHelper.getStringProperty((String)"queue", null, configuration);
        this.postOffice = postOffice;
        this.port = ConfigurationHelper.getIntProperty((String)"port", (int)0, configuration);
        this.host = ConfigurationHelper.getStringProperty((String)"host", (String)"localhost", configuration);
        this.quorumSize = ConfigurationHelper.getIntProperty((String)"quorum-size", (int)-1, configuration);
        this.haGroup = ConfigurationHelper.getStringProperty((String)"ha-group", (String)"activemq", configuration);
        this.vertxAddress = ConfigurationHelper.getStringProperty((String)"vertx-address", (String)"org.apache.activemq", configuration);
        this.publish = ConfigurationHelper.getBooleanProperty((String)"publish", (boolean)false, configuration);
    }

    public void start() throws Exception {
        if (this.isStarted) {
            return;
        }
        System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
        this.platformManager = this.quorumSize != -1 ? PlatformLocator.factory.createPlatformManager(this.port, this.host, this.quorumSize, this.haGroup) : PlatformLocator.factory.createPlatformManager(this.port, this.host);
        this.eventBus = this.platformManager.vertx().eventBus();
        if (this.connectorName == null || this.connectorName.trim().equals("")) {
            throw new Exception("invalid connector name: " + this.connectorName);
        }
        if (this.queueName == null || this.queueName.trim().equals("")) {
            throw new Exception("invalid queue name: " + this.queueName);
        }
        SimpleString name = new SimpleString(this.queueName);
        Binding b = this.postOffice.getBinding(name);
        if (b == null) {
            throw new Exception(this.connectorName + ": queue " + this.queueName + " not found");
        }
        this.queue = (Queue)b.getBindable();
        this.queue.addConsumer((Consumer)this);
        this.queue.deliverAsync();
        this.isStarted = true;
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": started");
    }

    public void stop() throws Exception {
        if (!this.isStarted) {
            return;
        }
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": receive shutdown request");
        this.queue.removeConsumer((Consumer)this);
        this.platformManager.stop();
        System.clearProperty("vertx.clusterManagerFactory");
        this.isStarted = false;
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": stopped");
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public String getName() {
        return this.connectorName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public HandleStatus handle(MessageReference ref) throws Exception {
        if (this.filter != null && !this.filter.match(ref.getMessage())) {
            return HandleStatus.NO_MATCH;
        }
        OutgoingVertxEventHandler outgoingVertxEventHandler = this;
        synchronized (outgoingVertxEventHandler) {
            ref.handled();
            ServerMessage message = ref.getMessage();
            Object vertxMsgBody = null;
            Integer type = message.getIntProperty("VertxMessageType");
            if (type == null) {
                ActiveMQVertxLogger.LOGGER.nonVertxMessage(message);
                type = 200;
            }
            if ((vertxMsgBody = this.extractMessageBody(message, type)) == null) {
                return HandleStatus.NO_MATCH;
            }
            if (!this.publish) {
                this.eventBus.send(this.vertxAddress, vertxMsgBody);
            } else {
                this.eventBus.publish(this.vertxAddress, vertxMsgBody);
            }
            this.queue.acknowledge(ref);
            ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": forwarded to vertx: " + message.getMessageID());
            return HandleStatus.HANDLED;
        }
    }

    private Object extractMessageBody(ServerMessage message, Integer type) throws Exception {
        Object vertxMsgBody = null;
        ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
        switch (type) {
            case 0: 
            case 11: {
                bodyBuffer.resetReaderIndex();
                vertxMsgBody = bodyBuffer.readString();
                break;
            }
            case 1: {
                int len = bodyBuffer.readInt();
                byte[] bytes = new byte[len];
                bodyBuffer.readBytes(bytes);
                vertxMsgBody = new Buffer(bytes);
                break;
            }
            case 2: {
                vertxMsgBody = bodyBuffer.readBoolean();
                break;
            }
            case 3: {
                int length = bodyBuffer.readInt();
                byte[] byteArray = new byte[length];
                bodyBuffer.readBytes(byteArray);
                vertxMsgBody = byteArray;
                break;
            }
            case 4: {
                vertxMsgBody = bodyBuffer.readByte();
                break;
            }
            case 5: {
                vertxMsgBody = Character.valueOf(bodyBuffer.readChar());
                break;
            }
            case 6: {
                vertxMsgBody = bodyBuffer.readDouble();
                break;
            }
            case 7: {
                vertxMsgBody = Float.valueOf(bodyBuffer.readFloat());
                break;
            }
            case 8: {
                vertxMsgBody = bodyBuffer.readInt();
                break;
            }
            case 9: {
                vertxMsgBody = bodyBuffer.readLong();
                break;
            }
            case 10: {
                vertxMsgBody = bodyBuffer.readShort();
                break;
            }
            case 12: {
                vertxMsgBody = new JsonObject(bodyBuffer.readString());
                break;
            }
            case 13: {
                vertxMsgBody = new JsonArray(bodyBuffer.readString());
                break;
            }
            case 100: {
                int failureType = bodyBuffer.readInt();
                int failureCode = bodyBuffer.readInt();
                String errMsg = bodyBuffer.readString();
                vertxMsgBody = new ReplyException(ReplyFailure.fromInt((int)failureType), failureCode, errMsg);
                break;
            }
            case 200: {
                int size = bodyBuffer.readableBytes();
                byte[] rawBytes = new byte[size];
                bodyBuffer.readBytes(rawBytes);
                vertxMsgBody = rawBytes;
                break;
            }
            default: {
                ActiveMQVertxLogger.LOGGER.invalidVertxType(type);
            }
        }
        return vertxMsgBody;
    }

    public void proceedDeliver(MessageReference reference) throws Exception {
    }

    public Filter getFilter() {
        return this.filter;
    }

    public String debug() {
        return null;
    }

    public String toManagementString() {
        return null;
    }

    public List<MessageReference> getDeliveringMessages() {
        return null;
    }

    public void disconnect() {
    }
}

