/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.server.netty.websocket;

import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.convert.value.ConvertibleValues;
import io.micronaut.http.HttpAttributes;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.bind.RequestBinderRegistry;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.micronaut.http.context.ServerRequestContext;
import io.micronaut.http.netty.websocket.AbstractNettyWebSocketHandler;
import io.micronaut.http.netty.websocket.NettyRxWebSocketSession;
import io.micronaut.http.netty.websocket.WebSocketSessionRepository;
import io.micronaut.inject.MethodExecutionHandle;
import io.micronaut.web.router.UriRouteMatch;
import io.micronaut.websocket.CloseReason;
import io.micronaut.websocket.RxWebSocketSession;
import io.micronaut.websocket.context.WebSocketBean;
import io.micronaut.websocket.event.WebSocketMessageProcessedEvent;
import io.micronaut.websocket.event.WebSocketSessionClosedEvent;
import io.micronaut.websocket.event.WebSocketSessionOpenEvent;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.security.Principal;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Internal
public class NettyServerWebSocketHandler
extends AbstractNettyWebSocketHandler {
    public static final String ID = "websocket-handler";
    private final WebSocketServerHandshaker handshaker;
    private final ApplicationEventPublisher eventPublisher;

    NettyServerWebSocketHandler(WebSocketSessionRepository webSocketSessionRepository, WebSocketServerHandshaker handshaker, HttpRequest<?> request, UriRouteMatch<Object, Object> routeMatch, WebSocketBean<?> webSocketBean, RequestBinderRegistry binderRegistry, MediaTypeCodecRegistry mediaTypeCodecRegistry, ApplicationEventPublisher eventPublisher, ChannelHandlerContext ctx) {
        block2: {
            super(ctx, binderRegistry, mediaTypeCodecRegistry, webSocketBean, request, routeMatch.getVariableValues(), handshaker.version(), webSocketSessionRepository);
            request.setAttribute(HttpAttributes.ROUTE_MATCH, routeMatch);
            request.setAttribute(HttpAttributes.ROUTE, routeMatch.getRoute());
            this.eventPublisher = eventPublisher;
            this.handshaker = handshaker;
            try {
                eventPublisher.publishEvent(new WebSocketSessionOpenEvent(this.session));
            }
            catch (Exception e) {
                if (!this.LOG.isErrorEnabled()) break block2;
                this.LOG.error("Error publishing WebSocket opened event: " + e.getMessage(), e);
            }
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            this.writeCloseFrameAndTerminate(ctx, CloseReason.GOING_AWAY);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public boolean acceptInboundMessage(Object msg) {
        return msg instanceof WebSocketFrame;
    }

    @Override
    protected NettyRxWebSocketSession createWebSocketSession(final ChannelHandlerContext ctx) {
        String id = (String)this.originatingRequest.getHeaders().get(HttpHeaderNames.SEC_WEBSOCKET_KEY);
        Channel channel = ctx.channel();
        NettyRxWebSocketSession session = new NettyRxWebSocketSession(id, channel, this.originatingRequest, this.mediaTypeCodecRegistry, this.webSocketVersion.toHttpHeaderValue(), ctx.pipeline().get(SslHandler.class) != null){
            private final ConvertibleValues<Object> uriVars;
            {
                super(x0, x1, x2, x3, x4, x5);
                this.uriVars = ConvertibleValues.of(NettyServerWebSocketHandler.this.uriVariables);
            }

            @Override
            public Optional<String> getSubprotocol() {
                return Optional.ofNullable(NettyServerWebSocketHandler.this.handshaker.selectedSubprotocol());
            }

            @Override
            public Set<? extends RxWebSocketSession> getOpenSessions() {
                return NettyServerWebSocketHandler.this.webSocketSessionRepository.getChannelGroup().stream().flatMap(ch -> {
                    NettyRxWebSocketSession s = ch.attr(NettyRxWebSocketSession.WEB_SOCKET_SESSION_KEY).get();
                    if (s != null && s.isOpen()) {
                        return Stream.of(s);
                    }
                    return Stream.empty();
                }).collect(Collectors.toSet());
            }

            @Override
            public void close(CloseReason closeReason) {
                super.close(closeReason);
                NettyServerWebSocketHandler.this.webSocketSessionRepository.removeChannel(ctx.channel());
            }

            @Override
            public Optional<Principal> getUserPrincipal() {
                return NettyServerWebSocketHandler.this.originatingRequest.getAttribute("micronaut.AUTHENTICATION", Principal.class);
            }

            @Override
            public ConvertibleValues<Object> getUriVariables() {
                return this.uriVars;
            }
        };
        this.webSocketSessionRepository.addChannel(channel);
        return session;
    }

    @Override
    protected Flowable<?> instrumentPublisher(ChannelHandlerContext ctx, Object result) {
        Publisher actual = Publishers.convertPublisher(result, Publisher.class);
        Publisher traced = subscriber -> ServerRequestContext.with(this.originatingRequest, () -> actual.subscribe(new Subscriber<Object>(){

            @Override
            public void onSubscribe(Subscription s) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onSubscribe(s));
            }

            @Override
            public void onNext(Object object) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onNext(object));
            }

            @Override
            public void onError(Throwable t) {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, () -> subscriber.onError(t));
            }

            @Override
            public void onComplete() {
                ServerRequestContext.with(NettyServerWebSocketHandler.this.originatingRequest, subscriber::onComplete);
            }
        }));
        return Flowable.fromPublisher(traced).subscribeOn(Schedulers.from(ctx.channel().eventLoop()));
    }

    @Override
    protected Object invokeExecutable(BoundExecutable boundExecutable, MethodExecutionHandle<?, ?> messageHandler) {
        return ServerRequestContext.with(this.originatingRequest, () -> boundExecutable.invoke(messageHandler.getTarget()));
    }

    @Override
    protected void messageHandled(ChannelHandlerContext ctx, NettyRxWebSocketSession session, Object message) {
        ctx.executor().execute(() -> {
            block2: {
                try {
                    this.eventPublisher.publishEvent(new WebSocketMessageProcessedEvent<Object>(session, message));
                }
                catch (Exception e) {
                    if (!this.LOG.isErrorEnabled()) break block2;
                    this.LOG.error("Error publishing WebSocket message processed event: " + e.getMessage(), e);
                }
            }
        });
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        block3: {
            Channel channel = ctx.channel();
            channel.attr(NettyRxWebSocketSession.WEB_SOCKET_SESSION_KEY).set(null);
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Removing WebSocket Server session: " + this.session);
            }
            this.webSocketSessionRepository.removeChannel(channel);
            try {
                this.eventPublisher.publishEvent(new WebSocketSessionClosedEvent(this.session));
            }
            catch (Exception e) {
                if (!this.LOG.isErrorEnabled()) break block3;
                this.LOG.error("Error publishing WebSocket closed event: " + e.getMessage(), e);
            }
        }
        super.handlerRemoved(ctx);
    }
}

