/*
 * Decompiled with CFR 0.152.
 */
package org.noear.socketd.transport.core.impl;

import java.io.IOException;
import java.io.NotActiveException;
import org.noear.socketd.exception.SocketDAlarmException;
import org.noear.socketd.exception.SocketDConnectionException;
import org.noear.socketd.exception.SocketDException;
import org.noear.socketd.transport.core.ChannelAssistant;
import org.noear.socketd.transport.core.ChannelInternal;
import org.noear.socketd.transport.core.Frame;
import org.noear.socketd.transport.core.FrameIoHandler;
import org.noear.socketd.transport.core.Listener;
import org.noear.socketd.transport.core.Processor;
import org.noear.socketd.transport.core.entity.PressureEntity;
import org.noear.socketd.transport.core.impl.HandshakeDefault;
import org.noear.socketd.transport.core.impl.IoCompletionHandlerImpl;
import org.noear.socketd.transport.core.listener.SimpleListener;
import org.noear.socketd.transport.stream.StreamInternal;
import org.noear.socketd.utils.IoCompletionHandler;
import org.noear.socketd.utils.MemoryUtils;
import org.noear.socketd.utils.RunnableEx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorDefault
implements Processor,
FrameIoHandler {
    private static Logger log = LoggerFactory.getLogger(ProcessorDefault.class);
    private Listener listener = new SimpleListener();

    @Override
    public void setListener(Listener listener) {
        if (listener != null) {
            this.listener = listener;
        }
    }

    @Override
    public <S> void sendFrame(ChannelInternal channel, Frame frame, ChannelAssistant<S> channelAssistant, S target) throws IOException {
        if (frame == null) {
            return;
        }
        if (!channel.isValid()) {
            throw new NotActiveException("Channel is invalid");
        }
        IoCompletionHandlerImpl completionHandler = new IoCompletionHandlerImpl();
        if (channel.getConfig().getTrafficLimiter() == null) {
            this.sendFrameHandle(channel, frame, channelAssistant, target, completionHandler);
        } else {
            channel.getConfig().getTrafficLimiter().sendFrame(this, channel, frame, channelAssistant, target, completionHandler);
        }
        if (completionHandler.getThrowable() != null) {
            if (completionHandler.getThrowable() instanceof IOException) {
                throw (IOException)completionHandler.getThrowable();
            }
            throw new SocketDException("Channel send failure", completionHandler.getThrowable());
        }
    }

    @Override
    public <S> void sendFrameHandle(ChannelInternal channel, Frame frame, ChannelAssistant<S> channelAssistant, S target, IoCompletionHandler completionHandler) {
        try {
            channelAssistant.write(target, frame, channel, completionHandler);
            if (frame.flag() >= 40) {
                this.listener.onSend(channel.getSession(), frame.message());
            }
        }
        catch (Throwable e) {
            completionHandler.completed(false, e);
        }
    }

    @Override
    public void reveFrame(ChannelInternal channel, Frame frame) {
        if (channel.getConfig().getTrafficLimiter() == null) {
            this.reveFrameHandle(channel, frame);
        } else {
            channel.getConfig().getTrafficLimiter().reveFrame(this, channel, frame);
        }
    }

    @Override
    public void reveFrameHandle(ChannelInternal channel, Frame frame) {
        if (log.isDebugEnabled()) {
            if (channel.getConfig().clientMode()) {
                log.debug("C-REV:{}", (Object)frame);
            } else {
                log.debug("S-REV:{}", (Object)frame);
            }
        }
        if (frame.flag() == 10) {
            HandshakeDefault handshake = new HandshakeDefault(frame.message());
            channel.setHandshake(handshake);
            channel.onOpenFuture((r, e) -> {
                if (r.booleanValue()) {
                    if (channel.isValid()) {
                        try {
                            channel.sendConnack();
                        }
                        catch (Throwable err) {
                            this.onError(channel, err);
                        }
                    }
                } else if (channel.isValid()) {
                    this.onCloseInternal(channel, 2001);
                }
            });
            this.onOpen(channel);
        } else if (frame.flag() == 11) {
            HandshakeDefault handshake = new HandshakeDefault(frame.message());
            channel.setHandshake(handshake);
            this.onOpen(channel);
        } else {
            if (channel.getHandshake() == null) {
                channel.close(1002);
                if (frame.flag() == 30) {
                    throw new SocketDConnectionException("Connection request was rejected");
                }
                if (log.isWarnEnabled()) {
                    log.warn("{} channel handshake is null, sessionId={}", (Object)channel.getConfig().getRoleName(), (Object)channel.getSession().sessionId());
                }
                return;
            }
            channel.setLiveTimeAsNow();
            try {
                switch (frame.flag()) {
                    case 20: {
                        this.callAsync(channel, () -> channel.sendPong());
                        break;
                    }
                    case 21: {
                        break;
                    }
                    case 30: {
                        int code = 0;
                        if (frame.message() != null) {
                            code = frame.message().metaAsInt("code");
                        }
                        if (code == 0) {
                            code = 1001;
                        }
                        this.onCloseInternal(channel, code);
                        break;
                    }
                    case 31: {
                        SocketDAlarmException exception = new SocketDAlarmException(frame.message());
                        channel.setAlarmCode(exception.getAlarmCode());
                        StreamInternal stream = channel.getStream(frame.message().sid());
                        if (stream == null) {
                            this.callAsync(channel, () -> this.onError(channel, exception));
                            break;
                        }
                        channel.getConfig().getStreamManger().removeStream(frame.message().sid());
                        this.callAsync(channel, () -> stream.onError(exception));
                        break;
                    }
                    case 32: {
                        int code = frame.message().metaAsInt("code");
                        channel.setAlarmCode(code);
                        break;
                    }
                    case 40: 
                    case 41: 
                    case 42: {
                        if (this.chkMemoryLimit(channel, frame)) {
                            this.onReceiveDo(channel, frame, false);
                        }
                        break;
                    }
                    case 48: 
                    case 49: {
                        this.onReceiveDo(channel, frame, true);
                        break;
                    }
                    default: {
                        this.onCloseInternal(channel, 1002);
                        break;
                    }
                }
            }
            catch (Throwable e2) {
                this.onError(channel, e2);
            }
        }
    }

    private boolean chkMemoryLimit(ChannelInternal channel, Frame frame) {
        float useMemoryRatio;
        if (channel.getConfig().useMaxMemoryLimit() && (useMemoryRatio = MemoryUtils.getUseMemoryRatio()) > channel.getConfig().getMaxMemoryRatio()) {
            if (frame.message().meta("X-Unlimited") == null) {
                try {
                    String alarm = String.format(" memory usage over limit: %.2f%%", Float.valueOf(useMemoryRatio * 100.0f));
                    if (log.isDebugEnabled()) {
                        log.debug("Local " + alarm + ", frame: " + frame);
                    }
                    PressureEntity pressure = new PressureEntity(channel.getConfig().getRoleName() + alarm);
                    channel.sendAlarm(frame.message(), pressure);
                }
                catch (Throwable e) {
                    this.onError(channel, e);
                }
                return false;
            }
            return true;
        }
        return true;
    }

    private void onReceiveDo(ChannelInternal channel, Frame frame, boolean isReply) throws IOException {
        String fragmentIdxStr;
        StreamInternal stream = null;
        int streamIndex = 0;
        int streamTotal = 1;
        if (isReply) {
            stream = channel.getStream(frame.message().sid());
        }
        if (channel.getConfig().getFragmentHandler().aggrEnable() && (fragmentIdxStr = frame.message().meta("Data-Fragment-Idx")) != null) {
            streamIndex = Integer.parseInt(fragmentIdxStr);
            Frame frameNew = channel.getConfig().getFragmentHandler().aggrFragment(channel, streamIndex, frame.message());
            if (stream != null) {
                streamTotal = Integer.parseInt(frame.message().metaOrDefault("Data-Fragment-Total", "0"));
            }
            if (frameNew == null) {
                if (stream != null) {
                    stream.onProgress(false, streamIndex, streamTotal);
                }
                return;
            }
            frame = frameNew;
        }
        if (isReply) {
            if (stream != null) {
                stream.onProgress(false, streamIndex, streamTotal);
            }
            this.onReply(channel, frame, stream);
        } else {
            this.onMessage(channel, frame);
        }
    }

    @Override
    public void onOpen(ChannelInternal channel) {
        this.callAsync(channel, () -> {
            try {
                this.listener.onOpen(channel.getSession());
                channel.doOpenFuture(true, null);
            }
            catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("{} channel listener onOpen error", (Object)channel.getConfig().getRoleName(), (Object)e);
                }
                channel.doOpenFuture(false, e);
            }
        });
    }

    @Override
    public void onMessage(ChannelInternal channel, Frame frame) {
        this.callAsync(channel, () -> {
            try {
                this.listener.onMessage(channel.getSession(), frame.message());
            }
            catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("{} channel listener onMessage error", (Object)channel.getConfig().getRoleName(), (Object)e);
                }
                this.onError(channel, e);
            }
        });
    }

    @Override
    public void onReply(ChannelInternal channel, Frame frame, StreamInternal stream) {
        if (stream != null) {
            if (stream.demands() < 2 || frame.flag() == 49) {
                channel.getConfig().getStreamManger().removeStream(frame.message().sid());
            }
            if (stream.demands() < 2) {
                stream.onReply(frame.message());
            }
            this.callAsync(channel, () -> {
                if (stream.demands() == 2) {
                    stream.onReply(frame.message());
                }
                this.listener.onReply(channel.getSession(), frame.message());
            });
        } else {
            this.callAsync(channel, () -> this.listener.onReply(channel.getSession(), frame.message()));
            if (log.isDebugEnabled()) {
                log.debug("{} stream not found, sid={}, sessionId={}", new Object[]{channel.getConfig().getRoleName(), frame.message().sid(), channel.getSession().sessionId()});
            }
        }
    }

    @Override
    public void onClose(ChannelInternal channel) {
        if (channel.closeCode() <= 1000) {
            this.onCloseInternal(channel, 2003);
        }
    }

    private void onCloseInternal(ChannelInternal channel, int code) {
        channel.close(code);
    }

    @Override
    public void onError(ChannelInternal channel, Throwable error) {
        block5: {
            if (channel == null || channel.getHandshake() == null) {
                if (log.isDebugEnabled()) {
                    log.debug("{} channel error", (Object)channel.getConfig().getRoleName(), (Object)error);
                }
            } else {
                try {
                    this.listener.onError(channel.getSession(), error);
                }
                catch (Throwable e) {
                    if (!log.isWarnEnabled()) break block5;
                    log.warn("{} channel listener onError error", (Object)channel.getConfig().getRoleName(), (Object)e);
                }
            }
        }
    }

    @Override
    public void doCloseNotice(ChannelInternal channel) {
        try {
            if (channel.getHandshake() != null) {
                this.listener.onClose(channel.getSession());
            }
        }
        catch (Throwable error) {
            this.onError(channel, error);
        }
    }

    private void callAsync(ChannelInternal channel, RunnableEx<Throwable> runnable) {
        try {
            channel.getConfig().getWorkExecutor().submit(() -> {
                try {
                    runnable.run();
                }
                catch (Throwable e) {
                    this.onError(channel, e);
                }
            });
        }
        catch (Throwable e) {
            this.onError(channel, e);
        }
    }
}

