/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.NoOpTaskExecutorBlobService;
import org.apache.flink.runtime.blob.TaskExecutorBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceExtension;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.testutils.TestFileUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.FunctionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class TaskExecutorSlotLifetimeTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    private static AllCallbackWrapper<TestingRpcServiceExtension> rpcServiceExtensionWrapper = new AllCallbackWrapper((CustomExtension)new TestingRpcServiceExtension());
    @RegisterExtension
    private final TestingFatalErrorHandlerExtension testingFatalErrorHandlerExtension = new TestingFatalErrorHandlerExtension();
    @TempDir
    private Path tempDir;

    TaskExecutorSlotLifetimeTest() {
    }

    @BeforeEach
    void setup() {
        UserClassLoaderExtractingInvokable.clearQueue();
    }

    @Test
    void testUserCodeClassLoaderIsBoundToSlot() throws Exception {
        Configuration configuration = new Configuration();
        TestingRpcService rpcService = ((TestingRpcServiceExtension)rpcServiceExtensionWrapper.getCustomExtension()).getTestingRpcService();
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture firstSlotReportFuture = new CompletableFuture();
        resourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> {
            firstSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ArrayBlockingQueue taskExecutionStates = new ArrayBlockingQueue(3);
        OneShotLatch slotsOfferedLatch = new OneShotLatch();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, slotOffers) -> {
            slotsOfferedLatch.trigger();
            return CompletableFuture.completedFuture(slotOffers);
        }).setUpdateTaskExecutionStateFunction(FunctionUtils.uncheckedFunction(taskExecutionState -> {
            taskExecutionStates.put(taskExecutionState);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).build();
        SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(resourceManagerGateway.getAddress(), resourceManagerGateway.getFencingToken().toUUID());
        SettableLeaderRetrievalService jobMasterLeaderRetriever = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder().setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever).setJobMasterLeaderRetrieverFunction(ignored -> jobMasterLeaderRetriever).build();
        rpcService.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        try (TaskExecutor taskExecutor = this.createTaskExecutor(configuration, rpcService, haServices, unresolvedTaskManagerLocation);){
            TaskExecutionState taskExecutionState2;
            taskExecutor.start();
            SlotReport slotReport = (SlotReport)firstSlotReportFuture.join();
            SlotID firstSlotId = ((SlotStatus)slotReport.iterator().next()).getSlotID();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway)taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            JobID jobId = new JobID();
            AllocationID allocationId = new AllocationID();
            taskExecutorGateway.requestSlot(firstSlotId, jobId, allocationId, ResourceProfile.ZERO, jobMasterGateway.getAddress(), resourceManagerGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorBuilder.newBuilder(jobId, UserClassLoaderExtractingInvokable.class).setAllocationId(allocationId).build();
            slotsOfferedLatch.await();
            taskExecutorGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            ClassLoader firstClassLoader = UserClassLoaderExtractingInvokable.take();
            while (!(taskExecutionState2 = (TaskExecutionState)taskExecutionStates.take()).getExecutionState().isTerminal()) {
            }
            taskExecutorGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), RpcUtils.INF_TIMEOUT).join();
            ClassLoader secondClassLoader = UserClassLoaderExtractingInvokable.take();
            Assertions.assertThat((Object)firstClassLoader).isSameAs((Object)secondClassLoader);
        }
    }

    private TaskExecutor createTaskExecutor(Configuration configuration, TestingRpcService rpcService, TestingHighAvailabilityServices haServices, LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws IOException {
        return new TaskExecutor((RpcService)rpcService, TaskManagerConfiguration.fromConfiguration((Configuration)configuration, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)configuration), (String)InetAddress.getLoopbackAddress().getHostAddress(), (File)TestFileUtils.createTempDir()), (HighAvailabilityServices)haServices, new TaskManagerServicesBuilder().setTaskSlotTable((TaskSlotTable<Task>)TaskSlotUtils.createTaskSlotTable(1, (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor())).setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation).setTaskStateManager(this.createTaskExecutorLocalStateStoresManager()).build(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (HeartbeatServices)new TestingHeartbeatServices(), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, (TaskExecutorBlobService)NoOpTaskExecutorBlobService.INSTANCE, (FatalErrorHandler)this.testingFatalErrorHandlerExtension.getTestingFatalErrorHandler(), (TaskExecutorPartitionTracker)new TestingTaskExecutorPartitionTracker(), new DelegationTokenReceiverRepository(configuration, null));
    }

    private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
        return new TaskExecutorLocalStateStoresManager(false, Reference.owned((Object)new File[]{TempDirUtils.newFolder((Path)this.tempDir)}), Executors.directExecutor());
    }

    public static final class UserClassLoaderExtractingInvokable
    extends AbstractInvokable {
        private static BlockingQueue<ClassLoader> userCodeClassLoaders = new ArrayBlockingQueue<ClassLoader>(2);

        public UserClassLoaderExtractingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            userCodeClassLoaders.put(this.getEnvironment().getUserCodeClassLoader().asClassLoader());
        }

        private static void clearQueue() {
            userCodeClassLoaders.clear();
        }

        private static ClassLoader take() throws InterruptedException {
            return userCodeClassLoaders.take();
        }
    }
}

