/*
 * Decompiled with CFR 0.152.
 */
package org.mule.http.client;

import io.qameta.allure.Issue;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.http.api.HttpConstants;
import org.mule.runtime.http.api.HttpService;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;
import org.mule.runtime.http.api.domain.message.request.HttpRequest;
import org.mule.runtime.http.api.domain.message.response.HttpResponse;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.test.AbstractIntegrationTestCase;

@Issue(value="W-19806707")
public class HttpClientSchedulerBusyOnFutureCompletionTestCase
extends AbstractIntegrationTestCase {
    private static final int SMALL_TIMEOUT_MS = 5000;
    @Rule
    public DynamicPort httpPort = new DynamicPort("httpPort");
    private HttpClient client;
    private TestServerThread server;

    protected String getConfigFile() {
        return "mule-config.xml";
    }

    protected void doSetUp() throws Exception {
        this.client = ((HttpService)this.getService(HttpService.class)).getClientFactory().create(new HttpClientConfiguration.Builder().setName("Test Client").build());
        this.client.start();
        this.server = new TestServerThread(this.httpPort.getNumber());
        this.server.start();
    }

    @After
    public void stopClient() throws InterruptedException {
        if (this.client != null) {
            this.client.stop();
        }
        if (this.server != null) {
            this.server.join(5000L);
        }
    }

    @Test
    public void whenSchedulerBusyOnFutureCompletionThenDirectRuns() throws ExecutionException, InterruptedException {
        Latch releaseSchedulerLatch = new Latch();
        Latch schedulerBusyLatch = new Latch();
        CompletableFuture resultFuture = new CompletableFuture();
        Scheduler otherScheduler = ((SchedulerService)this.getService(SchedulerService.class)).cpuLightScheduler();
        CompletableFuture response = this.client.sendAsync(HttpRequest.builder().method(HttpConstants.Method.GET).uri("http://localhost:%d/test".formatted(this.httpPort.getNumber())).build());
        response.whenCompleteAsync((httpResponse, t) -> {
            if (t != null) {
                resultFuture.completeExceptionally((Throwable)t);
            } else {
                resultFuture.complete(httpResponse);
            }
        }, command -> {
            try {
                schedulerBusyLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            try {
                otherScheduler.submit(command);
            }
            catch (Throwable t) {
                resultFuture.completeExceptionally(t);
                throw t;
            }
        });
        response.get();
        this.ensureSchedulerBusy(otherScheduler, releaseSchedulerLatch, schedulerBusyLatch);
        MatcherAssert.assertThat((Object)((HttpResponse)resultFuture.get()).getStatusCode(), (Matcher)Matchers.is((Object)200));
        releaseSchedulerLatch.release();
        otherScheduler.shutdown();
    }

    private void ensureSchedulerBusy(Scheduler scheduler, Latch releaseSchedulerLatch, Latch schedulerBusyLatch) {
        ((SchedulerService)this.getService(SchedulerService.class)).customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1)).submit(() -> {
            try {
                while (true) {
                    scheduler.submit(() -> {
                        releaseSchedulerLatch.await();
                        return 0;
                    });
                }
            }
            catch (RejectedExecutionException e) {
                schedulerBusyLatch.release();
                return;
            }
        });
    }

    private static class TestServerThread
    extends Thread {
        private final int port;

        private TestServerThread(int port) {
            this.port = port;
        }

        private Socket acceptOneConnection(int port) throws IOException {
            try (ServerSocket passiveSocket = new ServerSocket(port);){
                Socket socket = passiveSocket.accept();
                return socket;
            }
        }

        @Override
        public void run() {
            try (Socket peerSocket = this.acceptOneConnection(this.port);){
                OutputStream outputStream = peerSocket.getOutputStream();
                outputStream.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes());
                outputStream.flush();
                outputStream.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

