/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceAllocatorBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class TaskExecutorManagerTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    TaskExecutorManagerTest() {
    }

    @Test
    void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
        int numWorkerCpuCores = 3;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(3.0).build();
        ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(3.0).build();
        ResourceProfile offeredSlotProfile = ResourceProfile.newBuilder().setCpuCores(2.0).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();){
            taskExecutorManager.allocateWorker(requestedSlotProfile);
            Assertions.assertThat((int)taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
            TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, offeredSlotProfile);
            Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
            Assertions.assertThat((int)taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
        }
    }

    @Test
    void testPendingSlotNotFulfilledByAllocatedSlot() {
        int numWorkerCpuCores = 3;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(3.0).build();
        ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(3.0).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();){
            taskExecutorManager.allocateWorker(requestedSlotProfile);
            Assertions.assertThat((int)taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
            TaskExecutorConnection taskExecutorConnection = TaskExecutorManagerTest.createTaskExecutorConnection();
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorConnection.getResourceID(), 0), requestedSlotProfile, JobID.generate(), new AllocationID()));
            taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
            Assertions.assertThat((int)taskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
        }
    }

    @Test
    void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time taskManagerTimeout = Time.milliseconds((long)10L);
        CompletableFuture releaseResourceFuture = new CompletableFuture();
        TestingResourceAllocator resourceAllocator = TaskExecutorManagerTest.createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(resourceDeclarations -> {
            Assertions.assertThat((Collection)resourceDeclarations).hasSize(1);
            ResourceDeclaration resourceDeclaration = (ResourceDeclaration)resourceDeclarations.iterator().next();
            Assertions.assertThat((int)resourceDeclaration.getNumNeeded()).isZero();
            Assertions.assertThat((Collection)resourceDeclaration.getUnwantedWorkers()).hasSize(1);
            releaseResourceFuture.complete(resourceDeclaration.getUnwantedWorkers().iterator().next());
        }).build();
        ExecutorService mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setTaskManagerTimeout(taskManagerTimeout).setResourceAllocator(resourceAllocator).setMainThreadExecutor(mainThreadExecutor).createTaskExecutorManager();){
            ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                InstanceID newTaskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
                Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
                return newTaskExecutorId;
            }, mainThreadExecutor).thenCombine((CompletionStage)releaseResourceFuture, (registeredInstance, releasedInstance) -> {
                Assertions.assertThat((Comparable)registeredInstance).isEqualTo(releasedInstance);
                Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
                return registeredInstance;
            })).thenAccept(taskExecutorId -> {
                taskExecutorManager.unregisterTaskExecutor(taskExecutorId);
                Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isZero();
            })).get();
        }
    }

    @Test
    void testTimeoutForUnusedTaskManager() throws Exception {
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
        ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1.0).build();
        Time taskManagerTimeout = Time.milliseconds((long)50L);
        AtomicInteger declareResourceCount = new AtomicInteger(0);
        CompletableFuture releaseResourceFuture = new CompletableFuture();
        TestingResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().setDeclareResourceNeededConsumer(resourceDeclarations -> {
            Assertions.assertThat((int)resourceDeclarations.size()).isEqualTo(1);
            ResourceDeclaration resourceDeclaration = (ResourceDeclaration)resourceDeclarations.iterator().next();
            if (declareResourceCount.getAndIncrement() == 0) {
                Assertions.assertThat((int)resourceDeclaration.getNumNeeded()).isEqualTo(1);
                Assertions.assertThat((Collection)resourceDeclaration.getUnwantedWorkers()).isEmpty();
            } else {
                Assertions.assertThat((int)resourceDeclaration.getNumNeeded()).isZero();
                Assertions.assertThat((Collection)resourceDeclaration.getUnwantedWorkers()).hasSize(1);
                releaseResourceFuture.complete(resourceDeclaration.getUnwantedWorkers().iterator().next());
            }
        }).build();
        ExecutorService mainThreadExecutor = EXECUTOR_RESOURCE.getExecutor();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setTaskManagerTimeout(taskManagerTimeout).setDefaultWorkerResourceSpec(workerResourceSpec).setResourceAllocator(resourceAllocator).setMainThreadExecutor(mainThreadExecutor).createTaskExecutorManager();){
            ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                taskExecutorManager.allocateWorker(resourceProfile);
                InstanceID taskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, resourceProfile);
                taskExecutorManager.occupySlot(taskExecutorId);
                taskExecutorManager.freeSlot(taskExecutorId);
                return taskExecutorId;
            }, mainThreadExecutor).thenAcceptBoth((CompletionStage)releaseResourceFuture, (registeredInstance, releasedInstance) -> {
                AbstractComparableAssert cfr_ignored_0 = (AbstractComparableAssert)Assertions.assertThat((Comparable)registeredInstance).isEqualTo(releasedInstance);
            })).get();
        }
    }

    @Test
    void testRequestRedundantTaskManager() {
        ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1.0).build();
        AtomicInteger declareResourceCount = new AtomicInteger(0);
        TestingResourceAllocator resourceAllocator = new TestingResourceAllocatorBuilder().setDeclareResourceNeededConsumer(resourceDeclarations -> declareResourceCount.getAndIncrement()).build();
        ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        try (TaskExecutorManager taskExecutorManager = new TaskExecutorManagerBuilder((ScheduledExecutor)taskRestartExecutor).setRedundantTaskManagerNum(1).setMaxNumSlots(10).setResourceAllocator(resourceAllocator).createTaskExecutorManager();){
            taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat((AtomicInteger)declareResourceCount).hasValue(0);
            InstanceID taskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, resourceProfile);
            taskExecutorManager.occupySlot(taskExecutorId);
            Assertions.assertThat((AtomicInteger)declareResourceCount).hasValue(0);
            taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat((AtomicInteger)declareResourceCount).hasValue(1);
            taskRestartExecutor.triggerScheduledTasks();
            Assertions.assertThat((AtomicInteger)declareResourceCount).hasValue(1);
        }
    }

    @Test
    void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() {
        boolean numCoresPerWorker = true;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
        ResourceProfile requestedProfile = ResourceProfile.newBuilder().setCpuCores(2.0).build();
        AtomicInteger declareResourceCount = new AtomicInteger(0);
        TestingResourceAllocator resourceAllocator = TaskExecutorManagerTest.createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(resourceDeclarations -> declareResourceCount.incrementAndGet()).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceAllocator(resourceAllocator).createTaskExecutorManager();){
            Assertions.assertThat((Optional)taskExecutorManager.allocateWorker(requestedProfile)).isNotPresent();
            Assertions.assertThat((AtomicInteger)declareResourceCount).hasValue(0);
        }
    }

    @Test
    void testMaxSlotLimitAllocateWorker() {
        boolean numberSlots = true;
        boolean maxSlotNum = true;
        ArrayList resourceRequestNumber = new ArrayList();
        TestingResourceAllocator resourceAllocator = TaskExecutorManagerTest.createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(resourceDeclarations -> {
            Assertions.assertThat((Collection)resourceDeclarations).hasSize(1);
            ResourceDeclaration resourceDeclaration = (ResourceDeclaration)resourceDeclarations.iterator().next();
            resourceRequestNumber.add(resourceDeclaration.getNumNeeded());
        }).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceAllocator(resourceAllocator).createTaskExecutorManager();){
            Assertions.assertThat(resourceRequestNumber).isEmpty();
            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
            Assertions.assertThat(resourceRequestNumber).containsExactly((Object[])new Integer[]{1});
            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
            Assertions.assertThat(resourceRequestNumber).containsExactly((Object[])new Integer[]{1});
        }
    }

    @Test
    void testMaxSlotLimitRegisterWorker() throws Exception {
        boolean numberSlots = true;
        boolean maxSlotNum = true;
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).createTaskExecutorManager();){
            TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
            TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
            Assertions.assertThat((int)taskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
        }
    }

    @Test
    void testGetResourceOverview() {
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager();){
            InstanceID instanceId1 = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile1);
            InstanceID instanceId2 = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile2);
            taskExecutorManager.occupySlot(instanceId1);
            taskExecutorManager.occupySlot(instanceId2);
            Assertions.assertThat((Object)taskExecutorManager.getTotalFreeResources()).isEqualTo((Object)resourceProfile1.merge(resourceProfile2));
            Assertions.assertThat((Object)taskExecutorManager.getTotalFreeResourcesOf(instanceId1)).isEqualTo((Object)resourceProfile1);
            Assertions.assertThat((Object)taskExecutorManager.getTotalFreeResourcesOf(instanceId2)).isEqualTo((Object)resourceProfile2);
            Assertions.assertThat((Object)taskExecutorManager.getTotalRegisteredResources()).isEqualTo((Object)resourceProfile1.merge(resourceProfile2).multiply(2));
            Assertions.assertThat((Object)taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1)).isEqualTo((Object)resourceProfile1.multiply(2));
            Assertions.assertThat((Object)taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2)).isEqualTo((Object)resourceProfile2.multiply(2));
        }
    }

    private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
        return new TaskExecutorManagerBuilder((ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).setResourceAllocator(TaskExecutorManagerTest.createResourceAllocatorBuilder().build());
    }

    private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
        return new TestingResourceAllocatorBuilder();
    }

    private static InstanceID createAndRegisterTaskExecutor(TaskExecutorManager taskExecutorManager, int numSlots, ResourceProfile resourceProfile) {
        TaskExecutorConnection taskExecutorConnection = TaskExecutorManagerTest.createTaskExecutorConnection();
        List slotStatuses = IntStream.range(0, numSlots).mapToObj(slotNumber -> new SlotStatus(new SlotID(taskExecutorConnection.getResourceID(), slotNumber), resourceProfile)).collect(Collectors.toList());
        SlotReport slotReport = new SlotReport(slotStatuses);
        taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport, resourceProfile.multiply(numSlots), resourceProfile);
        return taskExecutorConnection.getInstanceID();
    }

    private static TaskExecutorConnection createTaskExecutorConnection() {
        return new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
    }
}

