/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.IdleTestTask;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoStats;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ThreadInfoRequestCoordinatorTest
extends TestLogger {
    private static final Duration REQUEST_TIMEOUT = Duration.ofMillis(100L);
    private static final String REQUEST_TIMEOUT_MESSAGE = "Request timeout.";
    private static final int DEFAULT_NUMBER_OF_SAMPLES = 1;
    private static final int DEFAULT_MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DEFAULT_DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    private static ScheduledExecutorService executorService;
    private ThreadInfoRequestCoordinator coordinator;

    @BeforeAll
    public static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterAll
    public static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @BeforeEach
    public void initCoordinator() throws Exception {
        this.coordinator = new ThreadInfoRequestCoordinator((Executor)executorService, REQUEST_TIMEOUT);
    }

    @AfterEach
    public void shutdownCoordinator() throws Exception {
        if (this.coordinator != null) {
            AssertionsForClassTypes.assertThat((int)this.coordinator.getNumberOfPendingRequests()).isEqualTo(0);
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testSuccessfulThreadInfoRequest() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        VertexThreadInfoStats threadInfoStats = (VertexThreadInfoStats)requestFuture.get();
        AssertionsForClassTypes.assertThat((int)threadInfoStats.getRequestId()).isEqualTo(0);
        Map samplesBySubtask = threadInfoStats.getSamplesBySubtask();
        for (Collection result : samplesBySubtask.values()) {
            Object[] stackTrace = ((ThreadInfoSample)result.iterator().next()).getStackTrace();
            AssertionsForClassTypes.assertThat((Object[])stackTrace).isNotEmpty();
        }
    }

    @Test
    public void testThreadInfoRequestWithException() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        try {
            requestFuture.get();
            Fail.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            AssertionsForClassTypes.assertThat((Throwable)e.getCause()).isInstanceOf(RuntimeException.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThreadInfoRequestTimeout() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        try {
            requestFuture.get();
            Fail.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            AssertionsForClassTypes.assertThat((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)REQUEST_TIMEOUT_MESSAGE).isPresent()).isTrue();
        }
        finally {
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testShutDown() throws Exception {
        Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.NEVER_COMPLETE);
        ArrayList<CompletableFuture> requestFutures = new ArrayList<CompletableFuture>();
        CompletableFuture requestFuture1 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        CompletableFuture requestFuture2 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        requestFutures.add(requestFuture1);
        requestFutures.add(requestFuture2);
        for (CompletableFuture future : requestFutures) {
            AssertionsForClassTypes.assertThat((CompletableFuture)future).isNotDone();
        }
        this.coordinator.shutDown();
        for (CompletableFuture future : requestFutures) {
            AssertionsForClassTypes.assertThat((CompletableFuture)future).isCompletedExceptionally();
        }
        CompletableFuture future = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        AssertionsForClassTypes.assertThat((CompletableFuture)future).isCompletedExceptionally();
    }

    private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskManagerGateway(CompletionType completionType) throws Exception {
        CompletableFuture responseFuture = new CompletableFuture();
        switch (completionType) {
            case SUCCESSFULLY: {
                HashSet<IdleTestTask> tasks = new HashSet<IdleTestTask>();
                IdleTestTask.executeWithTerminationGuarantee(() -> {
                    tasks.add(new IdleTestTask());
                    tasks.add(new IdleTestTask());
                    Map<Long, ExecutionAttemptID> threads = tasks.stream().collect(Collectors.toMap(task -> task.getExecutingThread().getId(), IdleTestTask::getExecutionId));
                    Map<ExecutionAttemptID, Collection> threadInfoSample = JvmUtils.createThreadInfoSample(threads.keySet(), (int)100).entrySet().stream().collect(Collectors.toMap(entry -> (ExecutionAttemptID)threads.get(entry.getKey()), entry -> Collections.singletonList(entry.getValue())));
                    responseFuture.complete(new TaskThreadInfoResponse(threadInfoSample));
                }, tasks);
                break;
            }
            case EXCEPTIONALLY: {
                responseFuture.completeExceptionally(new RuntimeException("Request failed."));
                break;
            }
            case TIMEOUT: {
                executorService.schedule(() -> responseFuture.completeExceptionally(new TimeoutException(REQUEST_TIMEOUT_MESSAGE)), REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                break;
            }
            case NEVER_COMPLETE: {
                break;
            }
            default: {
                throw new RuntimeException("Unknown completion type.");
            }
        }
        TaskExecutorThreadInfoGateway executorGateway = (taskExecutionAttemptId, requestParams, timeout) -> responseFuture;
        return CompletableFuture.completedFuture(executorGateway);
    }

    private static Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways(CompletionType ... completionTypes) throws Exception {
        HashMap<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> result = new HashMap<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>>();
        for (CompletionType completionType : completionTypes) {
            ImmutableSet ids = ImmutableSet.of((Object)ExecutionGraphTestUtils.createExecutionAttemptId(), (Object)ExecutionGraphTestUtils.createExecutionAttemptId());
            result.put((ImmutableSet<ExecutionAttemptID>)ids, ThreadInfoRequestCoordinatorTest.createMockTaskManagerGateway(completionType));
        }
        return result;
    }

    private static enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE;

    }
}

