/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingPhysicalSlotPayload;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.QuadConsumer;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;

class DefaultDeclarativeSlotPoolTest {
    private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build();
    private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build();

    DefaultDeclarativeSlotPoolTest() {
    }

    @Test
    void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(requirementsListener);
        ResourceCounter increment1 = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        ResourceCounter increment2 = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(increment1);
        slotPool.increaseResourceRequirementsBy(increment2);
        Assertions.assertThat((Collection)requirementsListener.takeResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(increment1));
        ResourceCounter totalResources = increment1.add(increment2);
        Assertions.assertThat((Collection)requirementsListener.takeResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(totalResources));
        Assertions.assertThat((boolean)requirementsListener.hasNextResourceRequirements()).isFalse();
    }

    @Test
    void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(requirementsListener);
        ResourceCounter increment = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)3);
        slotPool.increaseResourceRequirementsBy(increment);
        requirementsListener.takeResourceRequirements();
        ResourceCounter decrement = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)2);
        slotPool.decreaseResourceRequirementsBy(decrement);
        ResourceCounter totalResources = increment.subtract(decrement);
        Assertions.assertThat((Collection)requirementsListener.takeResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(totalResources));
        Assertions.assertThat((boolean)requirementsListener.hasNextResourceRequirements()).isFalse();
    }

    @Test
    void testGetResourceRequirements() {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEmpty();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements));
    }

    @Test
    void testOfferSlots() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(resourceRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Assertions.assertThat(acceptedSlots).containsExactlyInAnyOrderElementsOf(slotOffers);
        Map newSlotsById = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots).stream().collect(Collectors.toMap(SlotInfo::getAllocationId, Function.identity()));
        ((AbstractCollectionAssert)Assertions.assertThat(slotOffers).hasSize(newSlotsById.size())).allSatisfy(slotOffer -> {
            PhysicalSlot slot = (PhysicalSlot)newSlotsById.get(slotOffer.getAllocationId());
            Assertions.assertThat((Object)slot).isNotNull();
            Assertions.assertThat((Comparable)slotOffer.getAllocationId()).isEqualTo((Object)slot.getAllocationId());
            Assertions.assertThat((int)slotOffer.getSlotIndex()).isEqualTo(slot.getPhysicalSlotNumber());
            Assertions.assertThat((Object)slotOffer.getResourceProfile()).isEqualTo((Object)slot.getResourceProfile());
        });
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)slotPool.getAllSlotsInformation()).hasSize(newSlotsById.size())).allSatisfy(slotInfo -> {
            PhysicalSlot slot = (PhysicalSlot)newSlotsById.get(slotInfo.getAllocationId());
            Assertions.assertThat((Object)slot).isNotNull();
            Assertions.assertThat((Comparable)slotInfo.getAllocationId()).isEqualTo((Object)slot.getAllocationId());
            Assertions.assertThat((int)slotInfo.getPhysicalSlotNumber()).isEqualTo(slot.getPhysicalSlotNumber());
            Assertions.assertThat((Object)slotInfo.getResourceProfile()).isEqualTo((Object)slot.getResourceProfile());
            Assertions.assertThat((Comparable)slotInfo.getTaskManagerLocation()).isEqualTo((Object)slot.getTaskManagerLocation());
        });
    }

    @Test
    void testDuplicateSlotOfferings() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(resourceRequirements);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Assertions.assertThat(acceptedSlots).containsExactlyInAnyOrderElementsOf(slotOffers);
        Assertions.assertThat((boolean)notifyNewSlots.hasNextNewSlots()).isFalse();
    }

    @Test
    void testOfferingTooManySlotsWillRejectSuperfluousSlots() {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2);
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(increasedRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Map resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
        Assertions.assertThat((Collection)resourceRequirements.getResourcesWithCount()).allSatisfy(resourceCount -> Assertions.assertThat((Long)resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L)).isEqualTo((long)((Integer)resourceCount.getValue()).intValue()));
    }

    @Test
    void testReleaseSlotsRemovesSlots() throws InterruptedException {
        NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, DefaultDeclarativeSlotPoolTest.createResourceRequirements(), taskManagerLocation);
        notifyNewResourceRequirements.takeResourceRequirements();
        slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
        Assertions.assertThat((Collection)slotPool.getAllSlotsInformation()).isEmpty();
    }

    @Test
    void testReleaseSlotsReturnsSlot() {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, testingTaskExecutorGateway);
        slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
        Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots();
        Assertions.assertThat(freedSlots).containsExactlyInAnyOrderElementsOf((Iterable)slotOffers.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList()));
    }

    @Test
    void testReleaseSlotsOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        DefaultDeclarativeSlotPoolTest.withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation>)((QuadConsumer)(slotPool, freeSlot, slotToReserve, taskManagerLocation) -> {
            slotPool.reserveFreeSlot(slotToReserve.getAllocationId(), slotToReserve.getResourceProfile()).tryAssignPayload((PhysicalSlot.Payload)new TestingPhysicalSlotPayload());
            ResourceCounter fulfilledRequirements = slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
            Assertions.assertThat((int)fulfilledRequirements.getResourceCount(freeSlot.getResourceProfile())).isEqualTo(0);
            Assertions.assertThat((int)fulfilledRequirements.getResourceCount(slotToReserve.getResourceProfile())).isEqualTo(1);
        }));
    }

    @Test
    void testReleaseSlotOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        DefaultDeclarativeSlotPoolTest.withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation>)((QuadConsumer)(slotPool, freeSlot, slotToReserve, ignored) -> {
            slotPool.reserveFreeSlot(slotToReserve.getAllocationId(), slotToReserve.getResourceProfile()).tryAssignPayload((PhysicalSlot.Payload)new TestingPhysicalSlotPayload());
            ResourceCounter fulfilledRequirementsOfFreeSlot = slotPool.releaseSlot(freeSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
            ResourceCounter fulfilledRequirementsOfReservedSlot = slotPool.releaseSlot(slotToReserve.getAllocationId(), (Exception)new FlinkException("Test failure"));
            Assertions.assertThat((Collection)fulfilledRequirementsOfFreeSlot.getResources()).isEmpty();
            Assertions.assertThat((int)fulfilledRequirementsOfReservedSlot.getResourceCount(slotToReserve.getResourceProfile())).isEqualTo(1);
        }));
    }

    private static void withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation> test) {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1).add(RESOURCE_PROFILE_2, 1);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Iterator<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, testingTaskExecutorGateway).iterator();
        SlotOffer slot1 = slotOffers.next();
        SlotOffer slot2 = slotOffers.next();
        test.accept((Object)slotPool, (Object)slot1, (Object)slot2, (Object)taskManagerLocation);
    }

    @Test
    void testReleaseSlotDecreasesFulfilledResourceRequirements() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, null);
        Collection physicalSlots = notifyNewSlots.takeNewSlots();
        PhysicalSlot physicalSlot = (PhysicalSlot)physicalSlots.iterator().next();
        slotPool.releaseSlot(physicalSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
        ResourceCounter finalResourceRequirements = resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1);
        Assertions.assertThat((Object)slotPool.getFulfilledResourceRequirements()).isEqualTo((Object)finalResourceRequirements);
    }

    @Test
    void testReleaseSlotReturnsSlot() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, new LocalTaskManagerLocation(), testingTaskExecutorGateway);
        Collection physicalSlots = notifyNewSlots.takeNewSlots();
        PhysicalSlot physicalSlot = (PhysicalSlot)physicalSlots.iterator().next();
        slotPool.releaseSlot(physicalSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
        AllocationID freedSlot = (AllocationID)Iterables.getOnlyElement(freeSlotConsumer.drainFreedSlots());
        Assertions.assertThat((Comparable)freedSlot).isEqualTo((Object)physicalSlot.getAllocationId());
    }

    @Test
    void testReturnIdleSlotsAfterTimeout() {
        Time idleSlotTimeout = Time.seconds((long)10L);
        long offerTime = 0L;
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(idleSlotTimeout).build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Collection<SlotOffer> acceptedSlots = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, new LocalTaskManagerLocation(), testingTaskExecutorGateway);
        slotPool.decreaseResourceRequirementsBy(resourceRequirements);
        slotPool.releaseIdleSlots(0L + idleSlotTimeout.toMilliseconds());
        Collection<AllocationID> freedSlots = freeSlotConsumer.drainFreedSlots();
        Assertions.assertThat(freedSlots).containsExactlyInAnyOrderElementsOf((Iterable)acceptedSlots.stream().map(SlotOffer::getAllocationId).collect(Collectors.toList()));
        this.assertNoAvailableAndRequiredResources(slotPool);
    }

    private void assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) {
        Assertions.assertThat((boolean)slotPool.getFulfilledResourceRequirements().isEmpty()).isTrue();
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEmpty();
        Assertions.assertThat((Collection)slotPool.getAllSlotsInformation()).isEmpty();
    }

    @Test
    void testOnlyReturnExcessIdleSlots() {
        Time idleSlotTimeout = Time.seconds((long)10L);
        long offerTime = 0L;
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(idleSlotTimeout).build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(resourceRequirements);
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        ResourceCounter requiredResources = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        ResourceCounter excessRequirements = resourceRequirements.subtract(requiredResources);
        slotPool.decreaseResourceRequirementsBy(excessRequirements);
        slotPool.releaseIdleSlots(0L + idleSlotTimeout.toMilliseconds());
        Assertions.assertThat(acceptedSlots).isNotEmpty();
        Assertions.assertThat((Object)slotPool.getFulfilledResourceRequirements()).isEqualTo((Object)requiredResources);
    }

    @Test
    void testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirementsOfSameProfile() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter initialRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, initialRequirements, null);
        Collection<PhysicalSlot> newSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        PhysicalSlot newSlot = (PhysicalSlot)Iterables.getOnlyElement(newSlots);
        slotPool.reserveFreeSlot(newSlot.getAllocationId(), RESOURCE_PROFILE_1);
        slotPool.freeReservedSlot(newSlot.getAllocationId(), null, 0L);
        Collection<PhysicalSlot> recycledSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Assertions.assertThat((Object)Iterables.getOnlyElement(recycledSlots)).isEqualTo((Object)newSlot);
        Collection<SlotOffer> newSlotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(initialRequirements);
        Collection acceptedSlots = slotPool.offerSlots(newSlotOffers, (TaskManagerLocation)new LocalTaskManagerLocation(), SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Assertions.assertThat((Collection)acceptedSlots).isEmpty();
        Assertions.assertThat((boolean)slotPool.calculateUnfulfilledResources().isEmpty()).isTrue();
    }

    @Test
    void testRegisterSlotsAcceptsAllSlots() {
        DefaultDeclarativeSlotPool declarativeSlotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool();
        int numberSlots = 10;
        Collection<SlotOffer> slots = SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)10));
        declarativeSlotPool.registerSlots(slots, (TaskManagerLocation)new LocalTaskManagerLocation(), SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Collection allSlotsInformation = declarativeSlotPool.getAllSlotsInformation();
        Assertions.assertThat((Collection)allSlotsInformation).hasSize(10);
        Assertions.assertThat((Collection)allSlotsInformation).allSatisfy(slotInfo -> {
            ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)slotInfo.getResourceProfile()).isEqualTo((Object)RESOURCE_PROFILE_1);
        });
    }

    @Test
    void testFreedSlotWillRemainAssignedToMatchedResourceProfile() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile smallResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(512).build();
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.ANY, (int)1)));
        SlotInfo slot = (SlotInfo)slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next();
        slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile);
        Assertions.assertThat((int)slotPool.getFulfilledResourceRequirements().getResourceCount(largeResourceProfile)).isEqualTo(1);
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)smallResourceProfile, (int)1));
        slotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        slotPool.freeReservedSlot(slot.getAllocationId(), null, 1L);
        Assertions.assertThat((int)slotPool.getFulfilledResourceRequirements().getResourceCount(largeResourceProfile)).isEqualTo(1);
        Assertions.assertThat((int)slotPool.getFulfilledResourceRequirements().getResourceCount(smallResourceProfile)).isEqualTo(0);
    }

    @Test
    void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile smallResourceProfile = ResourceProfile.UNKNOWN;
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1)));
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)smallResourceProfile, (int)1));
        SlotInfo largeSlot = slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream().filter(slot -> slot.getResourceProfile().equals((Object)largeResourceProfile)).findFirst().get();
        slotPool.reserveFreeSlot(largeSlot.getAllocationId(), smallResourceProfile);
        ResourceCounter availableResources = slotPool.getFulfilledResourceRequirements();
        Assertions.assertThat((int)availableResources.getResourceCount(smallResourceProfile)).isEqualTo(1);
        Assertions.assertThat((int)availableResources.getResourceCount(largeResourceProfile)).isEqualTo(0);
        Collection currentResourceRequirements = slotPool.getResourceRequirements();
        Assertions.assertThat((Collection)currentResourceRequirements).containsExactly((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)largeResourceProfile, (int)2)});
    }

    @Test
    void testSetResourceRequirementsForInitialResourceRequirements() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)2);
        slotPool.setResourceRequirements(resourceRequirements);
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements));
    }

    @Test
    void testSetResourceRequirementsOverwritesPreviousValue() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        slotPool.setResourceRequirements(ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1));
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_2, (int)1);
        slotPool.setResourceRequirements(resourceRequirements);
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEqualTo(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements));
    }

    @Test
    void testRegisterSlotsDoesNotAffectRequirements() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile slotProfile = RESOURCE_PROFILE_1;
        ResourceProfile requestedProfile = ResourceProfile.UNKNOWN;
        slotPool.registerSlots(SlotPoolTestUtils.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)slotProfile, (int)1)), (TaskManagerLocation)new LocalTaskManagerLocation(), SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        AllocationID allocationId = (AllocationID)slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next();
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEmpty();
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)requestedProfile, (int)1));
        slotPool.reserveFreeSlot(allocationId, requestedProfile);
        slotPool.freeReservedSlot(allocationId, null, 1L);
        slotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)requestedProfile, (int)1));
        Assertions.assertThat((Collection)slotPool.getResourceRequirements()).isEmpty();
    }

    @Nonnull
    static ResourceCounter createResourceRequirements() {
        HashMap<ResourceProfile, Integer> requirements = new HashMap<ResourceProfile, Integer>();
        requirements.put(RESOURCE_PROFILE_1, 2);
        requirements.put(RESOURCE_PROFILE_2, 1);
        return ResourceCounter.withResources(requirements);
    }

    @Nonnull
    private static Collection<ResourceRequirement> toResourceRequirements(ResourceCounter resourceCounter) {
        return resourceCounter.getResourcesWithCount().stream().map(resourceCount -> ResourceRequirement.create((ResourceProfile)((ResourceProfile)resourceCount.getKey()), (int)((Integer)resourceCount.getValue()))).collect(Collectors.toList());
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool(NewResourceRequirementsService requirementsListener) {
        return DefaultDeclarativeSlotPoolBuilder.builder().setNotifyNewResourceRequirements(requirementsListener).build();
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        DefaultDeclarativeSlotPool declarativeSlotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool();
        declarativeSlotPool.registerNewSlotsListener(newSlotsListener);
        return declarativeSlotPool;
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() {
        return DefaultDeclarativeSlotPoolBuilder.builder().build();
    }

    @Nonnull
    private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool slotPool, ResourceCounter resourceRequirements, @Nullable LocalTaskManagerLocation taskManagerLocation) {
        return DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, null);
    }

    @Nonnull
    static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool slotPool, ResourceCounter resourceRequirements, @Nullable LocalTaskManagerLocation taskManagerLocation, @Nullable TaskExecutorGateway taskExecutorGateway) {
        Collection<SlotOffer> slotOffers = SlotPoolTestUtils.createSlotOffersForResourceRequirements(resourceRequirements);
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        return slotPool.offerSlots(slotOffers, (TaskManagerLocation)(taskManagerLocation == null ? new LocalTaskManagerLocation() : taskManagerLocation), SlotPoolTestUtils.createTaskManagerGateway(taskExecutorGateway), 0L);
    }

    @Nonnull
    static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService notifyNewSlots) throws InterruptedException {
        ArrayList<PhysicalSlot> newSlots = new ArrayList<PhysicalSlot>();
        while (notifyNewSlots.hasNextNewSlots()) {
            newSlots.addAll(notifyNewSlots.takeNewSlots());
        }
        return newSlots;
    }

    static class FreeSlotConsumer
    implements BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> {
        final BlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<AllocationID>(10);

        FreeSlotConsumer() {
        }

        @Override
        public CompletableFuture<Acknowledge> apply(AllocationID allocationID, Throwable throwable) {
            this.freedSlots.offer(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        Collection<AllocationID> drainFreedSlots() {
            ArrayList<AllocationID> result = new ArrayList<AllocationID>();
            this.freedSlots.drainTo(result);
            return result;
        }
    }

    static final class NewSlotsService
    implements DeclarativeSlotPool.NewSlotsListener {
        private final BlockingQueue<Collection<? extends PhysicalSlot>> physicalSlotsQueue = new ArrayBlockingQueue<Collection<? extends PhysicalSlot>>(2);

        NewSlotsService() {
        }

        private Collection<? extends PhysicalSlot> takeNewSlots() throws InterruptedException {
            return this.physicalSlotsQueue.take();
        }

        private boolean hasNextNewSlots() {
            return !this.physicalSlotsQueue.isEmpty();
        }

        public void notifyNewSlotsAreAvailable(Collection<? extends PhysicalSlot> newlyAvailableSlots) {
            this.physicalSlotsQueue.offer(newlyAvailableSlots);
        }
    }

    private static final class NewResourceRequirementsService
    implements Consumer<Collection<ResourceRequirement>> {
        private final BlockingQueue<Collection<ResourceRequirement>> resourceRequirementsQueue = new ArrayBlockingQueue<Collection<ResourceRequirement>>(2);

        private NewResourceRequirementsService() {
        }

        @Override
        public void accept(Collection<ResourceRequirement> resourceRequirements) {
            this.resourceRequirementsQueue.offer(resourceRequirements);
        }

        private Collection<ResourceRequirement> takeResourceRequirements() throws InterruptedException {
            return this.resourceRequirementsQueue.take();
        }

        public boolean hasNextResourceRequirements() {
            return !this.resourceRequirementsQueue.isEmpty();
        }
    }
}

