/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testutils.WorkingDirectoryResource;
import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TaskExecutorLocalStateStoresManagerTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @ClassRule
    public static final WorkingDirectoryResource WORKING_DIRECTORY_RESOURCE = new WorkingDirectoryResource();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreationFromConfig() throws Exception {
        Configuration config = new Configuration();
        File newFolder = temporaryFolder.newFolder();
        String tmpDir = newFolder.getAbsolutePath() + File.separator;
        String rootDirString = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", tmpDir);
        config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString);
        config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        WorkingDirectory workingDirectory = WORKING_DIRECTORY_RESOURCE.createNewWorkingDirectory();
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(this.createTaskManagerServiceConfiguration(config, workingDirectory), workingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            String[] split = rootDirString.split(",");
            File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < split.length; ++i) {
                Assertions.assertThat((Path)rootDirectories[i].toPath()).startsWith(Paths.get(split[i], new String[0]));
            }
            Assert.assertTrue((boolean)taskStateManager.isLocalRecoveryEnabled());
            for (File rootDirectory : rootDirectories) {
                FileUtils.deleteFileOrDirectory((File)rootDirectory);
            }
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCreationFromConfigDefault() throws Exception {
        Configuration config = new Configuration();
        WorkingDirectory workingDirectory = WORKING_DIRECTORY_RESOURCE.createNewWorkingDirectory();
        TaskManagerServicesConfiguration taskManagerServicesConfiguration = this.createTaskManagerServiceConfiguration(config, workingDirectory);
        TaskManagerServices taskManagerServices = this.createTaskManagerServices(taskManagerServicesConfiguration, workingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore();
            File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories();
            for (int i = 0; i < localStateRootDirectories.length; ++i) {
                Assert.assertEquals((Object)workingDirectory.getLocalStateDirectory(), (Object)localStateRootDirectories[i]);
            }
            Assert.assertFalse((boolean)taskStateManager.isLocalRecoveryEnabled());
        }
        finally {
            taskManagerServices.shutDown();
        }
    }

    @Test
    public void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        int subtaskIdx = 23;
        File[] rootDirs = new File[]{temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        boolean localRecoveryEnabled = false;
        TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(localRecoveryEnabled, Reference.owned((Object)rootDirs), Executors.directExecutor());
        TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        Assert.assertFalse((boolean)taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
        Assert.assertNull(taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElse(null));
        for (File recoveryDir : rootDirs) {
            Assert.assertEquals((long)0L, (long)recoveryDir.listFiles().length);
        }
    }

    @Test
    public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        int subtaskIdx = 23;
        File[] rootDirs = new File[]{temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
        TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)rootDirs), Executors.directExecutor());
        TaskLocalStateStore taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        LocalRecoveryDirectoryProvider directoryProvider = (LocalRecoveryDirectoryProvider)taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        for (int i = 0; i < 10; ++i) {
            Assert.assertEquals((Object)new File(rootDirs[(i & Integer.MAX_VALUE) % rootDirs.length], storesManager.allocationSubDirString(allocationID)), (Object)directoryProvider.allocationBaseDirectory((long)i));
        }
        long chkId = 42L;
        File allocBaseDirChk42 = directoryProvider.allocationBaseDirectory(chkId);
        File subtaskSpecificCheckpointDirectory = directoryProvider.subtaskSpecificCheckpointDirectory(chkId);
        Assert.assertEquals((Object)new File(allocBaseDirChk42, "jid_" + jobID + File.separator + "vtx_" + jobVertexID + "_sti_" + subtaskIdx + File.separator + "chk_" + chkId), (Object)subtaskSpecificCheckpointDirectory);
        Assert.assertTrue((boolean)subtaskSpecificCheckpointDirectory.mkdirs());
        File testFile = new File(subtaskSpecificCheckpointDirectory, "test");
        Assert.assertTrue((boolean)testFile.createNewFile());
        Assert.assertEquals((Object)storesManager.isLocalRecoveryEnabled(), (Object)taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
        Assert.assertTrue((boolean)testFile.exists());
        storesManager.releaseLocalStateForAllocationId(allocationID);
        this.checkRootDirsClean(rootDirs);
        AllocationID otherAllocationID = new AllocationID();
        taskLocalStateStore = storesManager.localStateStoreForSubtask(jobID, otherAllocationID, jobVertexID, subtaskIdx, new Configuration(), new Configuration());
        directoryProvider = (LocalRecoveryDirectoryProvider)taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        File chkDir = directoryProvider.subtaskSpecificCheckpointDirectory(23L);
        Assert.assertTrue((boolean)chkDir.mkdirs());
        testFile = new File(chkDir, "test");
        Assert.assertTrue((boolean)testFile.createNewFile());
        storesManager.shutdown();
        this.checkRootDirsClean(rootDirs);
    }

    @Test
    public void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {
        File localStateStoreA = temporaryFolder.newFolder();
        File localStateStoreB = temporaryFolder.newFolder();
        File[] localStateDirectories = new File[]{localStateStoreA, localStateStoreB};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)localStateDirectories), Executors.directExecutor());
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).doesNotExist();
        }
    }

    @Test
    public void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOException {
        File localStateStoreA = temporaryFolder.newFolder();
        File localStateStoreB = temporaryFolder.newFolder();
        File[] localStateDirectories = new File[]{localStateStoreA, localStateStoreB};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.borrowed((Object)localStateDirectories), Executors.directExecutor());
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File localStateDirectory : localStateDirectories) {
            Assertions.assertThat((File)localStateDirectory).exists();
        }
    }

    @Test
    public void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories() throws IOException {
        File localStateStore = temporaryFolder.newFolder();
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned((Object)new File[]{localStateStore}), Executors.directExecutor());
        JobID jobId = new JobID();
        AllocationID retainedAllocationId = new AllocationID();
        AllocationID otherAllocationId = new AllocationID();
        JobVertexID jobVertexId = new JobVertexID();
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobId, retainedAllocationId, jobVertexId, 0, new Configuration(), new Configuration());
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobId, otherAllocationId, jobVertexId, 1, new Configuration(), new Configuration());
        Collection allocationDirectories = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn((File)localStateStore);
        Assertions.assertThat((Collection)allocationDirectories).hasSize(2);
        taskExecutorLocalStateStoresManager.retainLocalStateForAllocations((Set)Sets.newHashSet((Object[])new AllocationID[]{retainedAllocationId}));
        Collection allocationDirectoriesAfterCleanup = TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn((File)localStateStore);
        Assertions.assertThat((Collection)allocationDirectoriesAfterCleanup).hasSize(1);
        Assertions.assertThat((File)new File(localStateStore, taskExecutorLocalStateStoresManager.allocationSubDirString(otherAllocationId))).doesNotExist();
    }

    private void checkRootDirsClean(File[] rootDirs) {
        for (File rootDir : rootDirs) {
            Object[] files = rootDir.listFiles();
            if (files == null) continue;
            Assert.assertArrayEquals((Object[])new File[0], (Object[])files);
        }
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration config, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration((Configuration)config, (ResourceID)ResourceID.generate(), (String)InetAddress.getLocalHost().getHostName(), (boolean)true, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)config), (WorkingDirectory)workingDirectory);
    }

    private TaskManagerServices createTaskManagerServices(TaskManagerServicesConfiguration config, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServices.fromConfiguration((TaskManagerServicesConfiguration)config, (PermanentBlobService)VoidPermanentBlobService.INSTANCE, (MetricGroup)UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), (ExecutorService)Executors.newDirectExecutorService(), throwable -> {}, (WorkingDirectory)workingDirectory);
    }
}

