/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.function.QuadFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DeclarativeSlotPoolServiceTest {
    private static final JobID jobId = new JobID();
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
    private static final String address = "localhost";

    DeclarativeSlotPoolServiceTest() {
    }

    @Test
    void testUnknownTaskManagerRegistration() throws Exception {
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService();){
            ResourceID unknownTaskManager = ResourceID.generate();
            Assertions.assertThat((boolean)declarativeSlotPoolService.isTaskManagerRegistered(unknownTaskManager.getResourceID())).isFalse();
        }
    }

    @Test
    void testKnownTaskManagerRegistration() throws Exception {
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService();){
            ResourceID knownTaskManager = ResourceID.generate();
            declarativeSlotPoolService.registerTaskManager(knownTaskManager);
            Assertions.assertThat((boolean)declarativeSlotPoolService.isTaskManagerRegistered(knownTaskManager.getResourceID())).isTrue();
        }
    }

    @Test
    void testReleaseTaskManager() throws Exception {
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService();){
            ResourceID knownTaskManager = ResourceID.generate();
            declarativeSlotPoolService.registerTaskManager(knownTaskManager);
            declarativeSlotPoolService.releaseTaskManager(knownTaskManager, (Exception)new FlinkException("Test cause"));
            Assertions.assertThat((boolean)declarativeSlotPoolService.isTaskManagerRegistered(knownTaskManager.getResourceID())).isFalse();
        }
    }

    @Test
    void testSlotOfferingOfUnknownTaskManagerIsIgnored() throws Exception {
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService();){
            List<SlotOffer> slotOffers = Collections.singletonList(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN));
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            Collection acceptedSlots = declarativeSlotPoolService.offerSlots((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), jobMasterId), slotOffers);
            Assertions.assertThat((Collection)acceptedSlots).isEmpty();
        }
    }

    @Test
    void testSlotOfferingOfKnownTaskManager() throws Exception {
        AtomicReference receivedSlotOffers = new AtomicReference();
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService(new TestingDeclarativeSlotPoolFactory(new TestingDeclarativeSlotPoolBuilder().setOfferSlotsFunction((QuadFunction<Collection<? extends SlotOffer>, TaskManagerLocation, TaskManagerGateway, Long, Collection<SlotOffer>>)((QuadFunction)(slotOffers, taskManagerLocation, taskManagerGateway, aLong) -> {
            receivedSlotOffers.set(slotOffers);
            return new ArrayList(slotOffers);
        }))));){
            LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
            declarativeSlotPoolService.registerTaskManager(taskManagerLocation2.getResourceID());
            List<SlotOffer> slotOffers2 = Collections.singletonList(new SlotOffer(new AllocationID(), 0, ResourceProfile.UNKNOWN));
            declarativeSlotPoolService.offerSlots((TaskManagerLocation)taskManagerLocation2, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), jobMasterId), slotOffers2);
            Assertions.assertThat((Collection)((Collection)receivedSlotOffers.get())).isEqualTo(slotOffers2);
        }
    }

    @Test
    void testConnectToResourceManagerDeclaresRequiredResources() throws Exception {
        List<ResourceRequirement> requiredResources = Arrays.asList(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2), ResourceRequirement.create((ResourceProfile)ResourceProfile.ZERO, (int)4));
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService(new TestingDeclarativeSlotPoolFactory(new TestingDeclarativeSlotPoolBuilder().setGetResourceRequirementsSupplier(() -> requiredResources)));){
            TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture declaredResourceRequirements = new CompletableFuture();
            resourceManagerGateway.setDeclareRequiredResourcesFunction((jobMasterId, resourceRequirements) -> {
                declaredResourceRequirements.complete(resourceRequirements);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            declarativeSlotPoolService.connectToResourceManager((ResourceManagerGateway)resourceManagerGateway);
            ResourceRequirements resourceRequirements2 = (ResourceRequirements)declaredResourceRequirements.join();
            Assertions.assertThat((Collection)resourceRequirements2.getResourceRequirements()).isEqualTo(requiredResources);
            Assertions.assertThat((Comparable)resourceRequirements2.getJobId()).isEqualTo((Object)jobId);
            Assertions.assertThat((String)resourceRequirements2.getTargetAddress()).isEqualTo(address);
        }
    }

    @Test
    void testCreateAllocatedSlotReport() throws Exception {
        LocalTaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
        LocalTaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
        SimpleSlotContext simpleSlotContext2 = this.createSimpleSlotContext(taskManagerLocation2);
        List<SlotInfo> slotInfos = Arrays.asList(this.createSimpleSlotContext(taskManagerLocation1), simpleSlotContext2);
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService(new TestingDeclarativeSlotPoolFactory(new TestingDeclarativeSlotPoolBuilder().setGetAllSlotsInformationSupplier(() -> slotInfos)));){
            AllocatedSlotReport allocatedSlotReport = declarativeSlotPoolService.createAllocatedSlotReport(taskManagerLocation2.getResourceID());
            Assertions.assertThat((Collection)allocatedSlotReport.getAllocatedSlotInfos()).allMatch(context -> context.getAllocationId().equals((Object)simpleSlotContext2.getAllocationId()) && context.getSlotIndex() == simpleSlotContext2.getPhysicalSlotNumber());
        }
    }

    @Test
    void testFailAllocationReleasesSlot() throws Exception {
        CompletableFuture releasedSlot = new CompletableFuture();
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService(new TestingDeclarativeSlotPoolFactory(new TestingDeclarativeSlotPoolBuilder().setReleaseSlotFunction((allocationID, exception) -> {
            releasedSlot.complete(allocationID);
            return ResourceCounter.empty();
        })));){
            ResourceID taskManagerId = ResourceID.generate();
            AllocationID allocationId = new AllocationID();
            declarativeSlotPoolService.registerTaskManager(taskManagerId);
            declarativeSlotPoolService.failAllocation(taskManagerId, allocationId, (Exception)new FlinkException("Test cause"));
            Assertions.assertThat((Comparable)((Comparable)releasedSlot.join())).isEqualTo((Object)allocationId);
        }
    }

    @Test
    void testFailLastAllocationOfTaskManagerReturnsIt() throws Exception {
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService();){
            ResourceID taskManagerId = ResourceID.generate();
            declarativeSlotPoolService.registerTaskManager(taskManagerId);
            Optional emptyTaskManager = declarativeSlotPoolService.failAllocation(taskManagerId, new AllocationID(), (Exception)new FlinkException("Test cause"));
            Assertions.assertThat(emptyTaskManager.orElseThrow(() -> new Exception("Expected empty task manager"))).isEqualTo((Object)taskManagerId);
        }
    }

    @Test
    void testCloseReleasesAllSlotsForAllRegisteredTaskManagers() throws Exception {
        ArrayDeque releasedSlotsFor = new ArrayDeque(2);
        try (DeclarativeSlotPoolService declarativeSlotPoolService = this.createDeclarativeSlotPoolService(new TestingDeclarativeSlotPoolFactory(new TestingDeclarativeSlotPoolBuilder().setReleaseSlotsFunction((resourceID, e) -> {
            releasedSlotsFor.offer(resourceID);
            return ResourceCounter.empty();
        })));){
            List<ResourceID> taskManagerResourceIds = Arrays.asList(ResourceID.generate(), ResourceID.generate(), ResourceID.generate());
            for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
                declarativeSlotPoolService.registerTaskManager(taskManagerResourceId);
            }
            declarativeSlotPoolService.close();
            Assertions.assertThat(releasedSlotsFor).containsExactlyInAnyOrderElementsOf(taskManagerResourceIds);
        }
    }

    @Test
    void testReleaseFreeSlotsOnTaskManager() throws Exception {
        try (DeclarativeSlotPoolService slotPoolService = this.createDeclarativeSlotPoolService();){
            LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
            slotPoolService.registerTaskManager(taskManagerLocation.getResourceID());
            ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1.0).build();
            SlotOffer slotOffer1 = new SlotOffer(new AllocationID(), 0, resourceProfile);
            SlotOffer slotOffer2 = new SlotOffer(new AllocationID(), 1, resourceProfile);
            DeclarativeSlotPool slotPool = slotPoolService.getDeclarativeSlotPool();
            slotPool.setResourceRequirements(ResourceCounter.withResource((ResourceProfile)resourceProfile, (int)2));
            DefaultDeclarativeSlotPoolTest.FreeSlotConsumer freeSlotConsumer = new DefaultDeclarativeSlotPoolTest.FreeSlotConsumer();
            List<SlotOffer> slotOffers = Arrays.asList(slotOffer1, slotOffer2);
            slotPoolService.offerSlots((TaskManagerLocation)taskManagerLocation, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway(), jobMasterId), slotOffers);
            slotPool.reserveFreeSlot(slotOffer1.getAllocationId(), resourceProfile);
            slotPoolService.releaseFreeSlotsOnTaskManager(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test cause"));
            Assertions.assertThat((Collection)slotPool.getFreeSlotsInformation()).isEmpty();
            Assertions.assertThat((Comparable)((SlotInfo)Iterables.getOnlyElement((Iterable)slotPool.getAllSlotsInformation())).getAllocationId()).isEqualTo((Object)slotOffer1.getAllocationId());
            Assertions.assertThat((Comparable)((Comparable)Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots()))).isEqualTo((Object)slotOffer2.getAllocationId());
        }
    }

    private DeclarativeSlotPoolService createDeclarativeSlotPoolService() throws Exception {
        return this.createDeclarativeSlotPoolService((DeclarativeSlotPoolFactory)new DefaultDeclarativeSlotPoolFactory());
    }

    private DeclarativeSlotPoolService createDeclarativeSlotPoolService(DeclarativeSlotPoolFactory declarativeSlotPoolFactory) throws Exception {
        DeclarativeSlotPoolService declarativeSlotPoolService = new DeclarativeSlotPoolService(jobId, declarativeSlotPoolFactory, (Clock)SystemClock.getInstance(), Time.seconds((long)20L), Time.seconds((long)20L));
        declarativeSlotPoolService.start(jobMasterId, address, this.mainThreadExecutor);
        return declarativeSlotPoolService;
    }

    @Nonnull
    private SimpleSlotContext createSimpleSlotContext(LocalTaskManagerLocation taskManagerLocation1) {
        return new SimpleSlotContext(new AllocationID(), taskManagerLocation1, 0, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), jobMasterId));
    }
}

