/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.netty.impl.server.sse;

import io.qameta.allure.Feature;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.net.InetSocketAddress;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.server.HttpServer;
import org.mule.runtime.http.api.sse.client.SseListener;
import org.mule.runtime.http.api.sse.client.SseRetryConfig;
import org.mule.runtime.http.api.sse.client.SseSource;
import org.mule.runtime.http.api.sse.client.SseSourceConfig;
import org.mule.runtime.http.api.sse.server.SseClient;
import org.mule.service.http.netty.impl.client.NettyHttpClient;
import org.mule.service.http.netty.impl.server.AcceptedConnectionChannelInitializer;
import org.mule.service.http.netty.impl.server.NettyHttpServer;
import org.mule.service.http.netty.impl.server.util.HttpListenerRegistry;
import org.mule.service.http.netty.utils.sse.SSEEventsAggregator;
import org.mule.service.http.netty.utils.sse.ServerSentEventTypeSafeMatcher;
import org.mule.tck.junit4.AbstractMuleTestCase;
import org.mule.tck.junit4.rule.DynamicPort;

@Feature(value="Server Sent Events (SSE)")
@Stories(value={@Story(value="SSE Source"), @Story(value="SSE Endpoint")})
public class SseTestCase
extends AbstractMuleTestCase {
    private static final SseRetryConfig DONT_RETRY_ON_EOS = new SseRetryConfig(true, 2000L, false);
    @Rule
    public DynamicPort serverPort = new DynamicPort("serverPort");
    private HttpServer httpServer;
    private HttpClient httpClient;
    private SseSourceConfig sseConfig;

    @Before
    public void setUp() throws Exception {
        String sseUrl = String.format("http://localhost:%d/sse", this.serverPort.getNumber());
        this.sseConfig = SseSourceConfig.builder((String)sseUrl).withRetryConfig(DONT_RETRY_ON_EOS).build();
        this.httpServer = SseTestCase.getHttpServer(this.serverPort.getNumber());
        this.httpServer.start();
        this.httpServer.sse("/sse", sseClient -> {
            try (SseClient sseClient2 = sseClient;){
                for (int i = 0; i < 10; ++i) {
                    sseClient.sendEvent("first", String.format("Event %d", i), "asd");
                    sseClient.sendEvent("second", String.format("Event %d", i));
                }
            }
            catch (Exception e) {
                Assert.fail((String)e.getMessage());
            }
        });
        this.httpClient = NettyHttpClient.builder().withResponseStreamingEnabled(true).build();
        this.httpClient.start();
    }

    @After
    public void tearDown() {
        this.httpClient.stop();
        this.httpServer.stop().dispose();
    }

    @Test
    public void sseSourceNeedsTheClientStreamingEnabled() {
        NettyHttpClient nonStreamingClient = NettyHttpClient.builder().withName("No-Streaming Client").withResponseStreamingEnabled(false).build();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.lambda$sseSourceNeedsTheClientStreamingEnabled$1((HttpClient)nonStreamingClient));
        MatcherAssert.assertThat((Object)exception, (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)Matchers.is((Object)"SSE source requires streaming enabled for client 'No-Streaming Client'")));
    }

    @Test
    public void allEventsToFallbackListener() throws InterruptedException {
        SSEEventsAggregator fallbackListener = new SSEEventsAggregator();
        SseSource sse = this.httpClient.sseSource(this.sseConfig);
        sse.register((SseListener)fallbackListener);
        sse.open();
        MatcherAssert.assertThat(fallbackListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 9"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 9")}));
    }

    @Test
    public void multiplexEventsByName() throws InterruptedException {
        SSEEventsAggregator firstListener = new SSEEventsAggregator();
        SSEEventsAggregator fallbackListener = new SSEEventsAggregator();
        SseSource sse = this.httpClient.sseSource(this.sseConfig);
        sse.register("first", (SseListener)firstListener);
        sse.register((SseListener)fallbackListener);
        sse.open();
        MatcherAssert.assertThat(firstListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 9")}));
        MatcherAssert.assertThat(fallbackListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 9")}));
    }

    private static HttpServer getHttpServer(int port) {
        HttpListenerRegistry listenerRegistry = new HttpListenerRegistry();
        return NettyHttpServer.builder().withServerAddress(new InetSocketAddress(port)).withHttpListenerRegistry(listenerRegistry).withShutdownTimeout(() -> 5000L).withClientChannelHandler(new AcceptedConnectionChannelInitializer(listenerRegistry, true, 30000, 10000L, null)).build();
    }

    private /* synthetic */ void lambda$sseSourceNeedsTheClientStreamingEnabled$1(HttpClient nonStreamingClient) throws Throwable {
        nonStreamingClient.sseSource(this.sseConfig);
    }
}

