/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.TriFunction;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class DeclarativeSlotPoolBridgeTest
extends TestLogger {
    private static final Time rpcTimeout = Time.seconds((long)20L);
    private static final JobID jobId = new JobID();
    private static final JobMasterId jobMasterId = JobMasterId.generate();
    private final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

    @Test
    public void testSlotOffer() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        AllocationID expectedAllocationId = new AllocationID();
        PhysicalSlot allocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(expectedAllocationId);
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            CompletableFuture slotAllocationFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, null);
            declarativeSlotPoolBridge.newSlotsAreAvailable(Collections.singleton(allocatedSlot));
            slotAllocationFuture.join();
        }
    }

    @Test
    public void testNotEnoughResourcesAvailableFailsPendingRequests() throws Exception {
        SlotRequestId slotRequestId = new SlotRequestId();
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(TestingDeclarativeSlotPool.builder());
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            CompletableFuture slotAllocationFuture = CompletableFuture.supplyAsync(() -> declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, Time.minutes((long)5L)), (Executor)this.mainThreadExecutor).get();
            this.mainThreadExecutor.execute(() -> declarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(Collections.emptyList()));
            MatcherAssert.assertThat((Object)slotAllocationFuture, (Matcher)FlinkMatchers.futureWillCompleteExceptionally(NoResourceAvailableException.class, (Duration)Duration.ofSeconds(10L)));
        }
    }

    @Test
    public void testReleasingAllocatedSlot() throws Exception {
        CompletableFuture releaseSlotFuture = new CompletableFuture();
        AllocationID expectedAllocationId = new AllocationID();
        PhysicalSlot allocatedSlot = DeclarativeSlotPoolBridgeTest.createAllocatedSlot(expectedAllocationId);
        TestingDeclarativeSlotPoolBuilder builder = TestingDeclarativeSlotPool.builder().setReserveFreeSlotFunction((allocationId, resourceProfile) -> {
            MatcherAssert.assertThat((Object)allocationId, (Matcher)CoreMatchers.is((Object)expectedAllocationId));
            return allocatedSlot;
        }).setFreeReservedSlotFunction((TriFunction<AllocationID, Throwable, Long, ResourceCounter>)((TriFunction)(allocationID, throwable, aLong) -> {
            releaseSlotFuture.complete(allocationID);
            return ResourceCounter.empty();
        }));
        TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory = new TestingDeclarativeSlotPoolFactory(builder);
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge(declarativeSlotPoolFactory);){
            declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            SlotRequestId slotRequestId = new SlotRequestId();
            declarativeSlotPoolBridge.allocateAvailableSlot(slotRequestId, expectedAllocationId, allocatedSlot.getResourceProfile());
            declarativeSlotPoolBridge.releaseSlot(slotRequestId, null);
            MatcherAssert.assertThat(releaseSlotFuture.join(), (Matcher)CoreMatchers.is((Object)expectedAllocationId));
        }
    }

    @Test
    public void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Exception {
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge((DeclarativeSlotPoolFactory)new DefaultDeclarativeSlotPoolFactory());){
            declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            List<SlotRequestId> slotRequestIds = Arrays.asList(new SlotRequestId(), new SlotRequestId());
            List slotFutures = slotRequestIds.stream().map(slotRequestId -> {
                CompletableFuture slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, rpcTimeout);
                slotFuture.whenComplete((physicalSlot, throwable) -> {
                    if (throwable != null) {
                        declarativeSlotPoolBridge.releaseSlot(slotRequestId, throwable);
                    }
                });
                return slotFuture;
            }).collect(Collectors.toList());
            declarativeSlotPoolBridge.close();
            try {
                FutureUtils.waitForAll(slotFutures).get();
                Assert.fail((String)"The slot futures should be completed exceptionally.");
            }
            catch (ExecutionException executionException) {
                // empty catch block
            }
        }
    }

    @Test
    public void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception {
        try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge = DeclarativeSlotPoolBridgeTest.createDeclarativeSlotPoolBridge((DeclarativeSlotPoolFactory)new DefaultDeclarativeSlotPoolFactory());){
            declarativeSlotPoolBridge.start(jobMasterId, "localhost", this.mainThreadExecutor);
            CompletableFuture slotFuture = declarativeSlotPoolBridge.requestNewAllocatedSlot(new SlotRequestId(), ResourceProfile.UNKNOWN, rpcTimeout);
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());
            AllocationID allocationId = new AllocationID();
            declarativeSlotPoolBridge.offerSlots((TaskManagerLocation)localTaskManagerLocation, (TaskManagerGateway)new SimpleAckingTaskManagerGateway(), Collections.singleton(new SlotOffer(allocationId, 0, ResourceProfile.ANY)));
            MatcherAssert.assertThat((Object)((PhysicalSlot)slotFuture.join()).getAllocationId(), (Matcher)CoreMatchers.is((Object)allocationId));
        }
    }

    @Nonnull
    static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
        return new DeclarativeSlotPoolBridge(jobId, declarativeSlotPoolFactory, (Clock)SystemClock.getInstance(), rpcTimeout, Time.seconds((long)20L), Time.seconds((long)20L));
    }

    static PhysicalSlot createAllocatedSlot(AllocationID allocationID) {
        return new AllocatedSlot(allocationID, (TaskManagerLocation)new LocalTaskManagerLocation(), 0, ResourceProfile.ANY, (TaskManagerGateway)new RpcTaskManagerGateway((TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(), JobMasterId.generate()));
    }
}

