/*
 * Decompiled with CFR 0.152.
 */
package org.noear.socketd.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import org.noear.socketd.broker.BrokerListenerBase;
import org.noear.socketd.transport.core.Entity;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Message;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.utils.RunUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerListener
extends BrokerListenerBase
implements Listener {
    protected static final Logger log = LoggerFactory.getLogger(BrokerListener.class);

    @Override
    public void onOpen(Session session) throws IOException {
        String name = session.name();
        this.addPlayer(name, session);
    }

    @Override
    public void onClose(Session session) {
        String name = session.name();
        this.removePlayer(name, session);
    }

    @Override
    public void onMessage(Session requester, Message message) throws IOException {
        String atName = message.atName();
        if (atName == null) {
            requester.sendAlarm(message, "Broker message require '@' meta");
            return;
        }
        if (atName.equals("*")) {
            Collection<String> nameAll = this.getNameAll();
            if (nameAll != null && nameAll.size() > 0) {
                for (String name : new ArrayList<String>(nameAll)) {
                    this.forwardToName(requester, message, name);
                }
            }
        } else if (atName.endsWith("*")) {
            if (!this.forwardToName(requester, message, atName = atName.substring(0, atName.length() - 1))) {
                requester.sendAlarm(message, "Broker don't have '@" + atName + "' player");
            }
        } else {
            Session responder = this.getPlayerAny(atName, requester, message);
            if (responder != null) {
                this.forwardToSession(requester, message, responder);
            } else {
                requester.sendAlarm(message, "Broker don't have '@" + atName + "' session");
            }
        }
    }

    public boolean forwardToName(Session requester, Message message, String name) throws IOException {
        Collection<Session> playerAll = this.getPlayerAll(name);
        if (playerAll != null && playerAll.size() > 0) {
            for (Session responder : new ArrayList<Session>(playerAll)) {
                if (responder == requester) continue;
                if (responder.isValid()) {
                    this.forwardToSession(requester, message, responder);
                    continue;
                }
                this.onClose(responder);
            }
            return true;
        }
        return false;
    }

    public void forwardToSession(Session requester, Message message, Session responder) throws IOException {
        if (message.isRequest()) {
            responder.sendAndRequest(message.event(), message, -1L).thenReply(reply -> {
                if (requester.isValid()) {
                    requester.reply(message, (Entity)reply);
                }
            }).thenError(err -> {
                if (requester.isValid()) {
                    RunUtils.runAndTry(() -> requester.sendAlarm(message, err.getMessage()));
                }
            });
        } else if (message.isSubscribe()) {
            responder.sendAndSubscribe(message.event(), message).thenReply(reply -> {
                if (requester.isValid()) {
                    if (reply.isEnd()) {
                        requester.replyEnd(message, (Entity)reply);
                    } else {
                        requester.reply(message, (Entity)reply);
                    }
                }
            }).thenError(err -> {
                if (requester.isValid()) {
                    RunUtils.runAndTry(() -> requester.sendAlarm(message, err.getMessage()));
                }
            });
        } else {
            responder.send(message.event(), message);
        }
    }

    @Override
    public void onError(Session session, Throwable error) {
        if (log.isWarnEnabled()) {
            log.warn("Broker error", error);
        }
    }
}

