/*
 * Decompiled with CFR 0.152.
 */
package org.mule.service.http.test.common.sse;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.request.HttpRequestBuilder;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.runtime.http.api.server.HttpServer;
import org.mule.runtime.http.api.server.HttpServerConfiguration;
import org.mule.runtime.http.api.server.ServerCreationException;
import org.mule.runtime.http.api.sse.client.SseListener;
import org.mule.runtime.http.api.sse.client.SseRetryConfig;
import org.mule.runtime.http.api.sse.client.SseSource;
import org.mule.runtime.http.api.sse.client.SseSourceConfig;
import org.mule.runtime.http.api.sse.server.SseClient;
import org.mule.service.http.test.common.AbstractHttpServiceTestCase;
import org.mule.service.http.test.common.util.HttpMessageHeaderMatcher;
import org.mule.service.http.test.netty.utils.sse.SSEEventsAggregator;
import org.mule.service.http.test.netty.utils.sse.ServerSentEventTypeSafeMatcher;
import org.mule.tck.junit5.DynamicPort;

@Feature(value="Server Sent Events (SSE)")
@Stories(value={@Story(value="SSE Source"), @Story(value="SSE Endpoint")})
public class SseTestCase
extends AbstractHttpServiceTestCase {
    private static final SseRetryConfig DONT_RETRY_ON_EOS = new SseRetryConfig(true, 2000L, false);
    @DynamicPort(systemProperty="serverPort")
    Integer serverPort;
    private String sseUrl;
    private HttpServer httpServer;
    private HttpClient httpClient;
    private SseSourceConfig sseConfig;

    public SseTestCase(String serviceToLoad) {
        super(serviceToLoad);
    }

    @BeforeEach
    void setUp() throws Exception {
        this.sseUrl = String.format("http://localhost:%d/sse", this.serverPort);
        this.sseConfig = SseSourceConfig.builder((String)this.sseUrl).withRetryConfig(DONT_RETRY_ON_EOS).build();
        this.httpServer = this.getHttpServer(this.serverPort);
        this.httpServer.start();
        this.httpServer.sse("/sse", ctx -> ctx.customizeResponse(res -> res.addResponseHeader("X-My-Custom-Header", "Value")), sseClient -> {
            try (SseClient sseClient2 = sseClient;){
                for (int i = 0; i < 10; ++i) {
                    sseClient.sendEvent("first", String.format("Event %d", i), "asd");
                    sseClient.sendEvent("second", String.format("Event %d", i));
                }
            }
            catch (Exception e) {
                Assert.fail((String)e.getMessage());
            }
        });
        this.httpClient = this.service.getClientFactory().create(new HttpClientConfiguration.Builder().setName("SSE Client").setStreaming(true).build());
        this.httpClient.start();
    }

    @AfterEach
    void tearDown() {
        this.httpClient.stop();
        this.httpServer.stop().dispose();
    }

    @Test
    void sseSourceNeedsTheClientStreamingEnabled() {
        HttpClient nonStreamingClient = this.service.getClientFactory().create(new HttpClientConfiguration.Builder().setName("No-Streaming Client").setStreaming(false).build());
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> nonStreamingClient.sseSource(this.sseConfig));
        MatcherAssert.assertThat((Object)exception, (Matcher)ThrowableMessageMatcher.hasMessage((Matcher)Matchers.is((Object)"SSE source requires streaming enabled for client 'No-Streaming Client'")));
    }

    @Test
    void allEventsToFallbackListener() throws InterruptedException, ExecutionException {
        SSEEventsAggregator fallbackListener = new SSEEventsAggregator();
        SseSource sse = this.httpClient.sseSource(this.sseConfig);
        sse.register((SseListener)fallbackListener);
        sse.open();
        MatcherAssert.assertThat(fallbackListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 9"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 9")}));
    }

    @Test
    void multiplexEventsByName() throws InterruptedException, ExecutionException {
        SSEEventsAggregator firstListener = new SSEEventsAggregator();
        SSEEventsAggregator fallbackListener = new SSEEventsAggregator();
        SseSource sse = this.httpClient.sseSource(this.sseConfig);
        sse.register("first", (SseListener)firstListener);
        sse.register((SseListener)fallbackListener);
        sse.open();
        MatcherAssert.assertThat(firstListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("first", "Event 9")}));
        MatcherAssert.assertThat(fallbackListener.getList(), (Matcher)Matchers.contains((Matcher[])new Matcher[]{ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 0"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 1"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 2"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 3"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 4"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 5"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 6"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 7"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 8"), ServerSentEventTypeSafeMatcher.aServerSentEvent("second", "Event 9")}));
    }

    @Test
    @Issue(value="W-18599423")
    void serverSendsCustomHeader() throws ExecutionException, InterruptedException {
        HttpRequest request = ((HttpRequestBuilder)HttpRequest.builder().uri(this.sseUrl).addHeader("Accept", "text/event-stream")).build();
        CompletableFuture future = this.httpClient.sendAsync(request);
        HttpResponse response = (HttpResponse)future.get();
        MatcherAssert.assertThat((Object)response, HttpMessageHeaderMatcher.hasHeader("X-My-Custom-Header", "Value"));
    }

    private HttpServer getHttpServer(int port) throws ServerCreationException {
        return this.service.getServerFactory().create(new HttpServerConfiguration.Builder().setName("SSE Server").setHost("localhost").setPort(port).build());
    }
}

