package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Paths;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.AllCallbackWrapper;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testutils.WorkingDirectoryExtension;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.class */
class TaskExecutorLocalStateStoresManagerTest {

    @TempDir
    public static File temporaryFolder;

    @RegisterExtension
    public static final AllCallbackWrapper<WorkingDirectoryExtension> WORKING_DIRECTORY_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new WorkingDirectoryExtension(() -> {
        return temporaryFolder;
    }));

    TaskExecutorLocalStateStoresManagerTest() {
    }

    @Test
    void testCreationFromConfig() throws Exception {
        Configuration configuration = new Configuration();
        String replaceAll = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", TempDirUtils.newFolder(temporaryFolder.toPath()).getAbsolutePath() + File.separator);
        configuration.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, replaceAll);
        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        WorkingDirectory createNewWorkingDirectory = ((WorkingDirectoryExtension) WORKING_DIRECTORY_EXTENSION_WRAPPER.getCustomExtension()).createNewWorkingDirectory();
        TaskManagerServices createTaskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(configuration, createNewWorkingDirectory), createNewWorkingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskManagerStateStore = createTaskManagerServices.getTaskManagerStateStore();
            String[] split = replaceAll.split(",");
            File[] localStateRootDirectories = taskManagerStateStore.getLocalStateRootDirectories();
            for (int i = 0; i < split.length; i++) {
                Assertions.assertThat(localStateRootDirectories[i].toPath()).startsWith(Paths.get(split[i], new String[0]));
            }
            Assertions.assertThat(taskManagerStateStore.isLocalRecoveryEnabled()).isTrue();
            for (File file : localStateRootDirectories) {
                FileUtils.deleteFileOrDirectory(file);
            }
        } finally {
            createTaskManagerServices.shutDown();
        }
    }

    @Test
    void testCreationFromConfigDefault() throws Exception {
        Configuration configuration = new Configuration();
        WorkingDirectory createNewWorkingDirectory = ((WorkingDirectoryExtension) WORKING_DIRECTORY_EXTENSION_WRAPPER.getCustomExtension()).createNewWorkingDirectory();
        TaskManagerServices createTaskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(configuration, createNewWorkingDirectory), createNewWorkingDirectory);
        try {
            TaskExecutorLocalStateStoresManager taskManagerStateStore = createTaskManagerServices.getTaskManagerStateStore();
            for (File file : taskManagerStateStore.getLocalStateRootDirectories()) {
                Assertions.assertThat(file).isEqualTo(createNewWorkingDirectory.getLocalStateDirectory());
            }
            Assertions.assertThat(taskManagerStateStore.isLocalRecoveryEnabled()).isFalse();
            createTaskManagerServices.shutDown();
        } catch (Throwable th) {
            createTaskManagerServices.shutDown();
            throw th;
        }
    }

    @Test
    void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        File[] fileArr = {TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath())};
        TaskLocalStateStore localStateStoreForSubtask = new TaskExecutorLocalStateStoresManager(false, Reference.owned(fileArr), Executors.directExecutor()).localStateStoreForSubtask(jobID, allocationID, jobVertexID, 23, new Configuration(), new Configuration());
        Assertions.assertThat(localStateStoreForSubtask.getLocalRecoveryConfig().isLocalRecoveryEnabled()).isFalse();
        Assertions.assertThat(localStateStoreForSubtask.getLocalRecoveryConfig().getLocalStateDirectoryProvider()).isNotPresent();
        for (File file : fileArr) {
            Assertions.assertThat(file).isEmptyDirectory();
        }
    }

    @Test
    void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        AllocationID allocationID = new AllocationID();
        File[] fileArr = {TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath())};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(fileArr), Executors.directExecutor());
        TaskLocalStateStore localStateStoreForSubtask = taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, 23, new Configuration(), new Configuration());
        LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider = (LocalRecoveryDirectoryProvider) localStateStoreForSubtask.getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
        for (int i = 0; i < 10; i++) {
            Assertions.assertThat(new File(fileArr[(i & Integer.MAX_VALUE) % fileArr.length], taskExecutorLocalStateStoresManager.allocationSubDirString(allocationID))).isEqualTo(localRecoveryDirectoryProvider.allocationBaseDirectory(i));
        }
        File allocationBaseDirectory = localRecoveryDirectoryProvider.allocationBaseDirectory(42L);
        File subtaskSpecificCheckpointDirectory = localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(42L);
        Assertions.assertThat(new File(allocationBaseDirectory, "jid_" + jobID + File.separator + "vtx_" + jobVertexID + "_sti_23" + File.separator + "chk_42")).isEqualTo(subtaskSpecificCheckpointDirectory);
        Assertions.assertThat(subtaskSpecificCheckpointDirectory.mkdirs()).isTrue();
        File file = new File(subtaskSpecificCheckpointDirectory, "test");
        Assertions.assertThat(file.createNewFile()).isTrue();
        Assertions.assertThat(localStateStoreForSubtask.getLocalRecoveryConfig().isLocalRecoveryEnabled()).isEqualTo(taskExecutorLocalStateStoresManager.isLocalRecoveryEnabled());
        Assertions.assertThat(file).exists();
        taskExecutorLocalStateStoresManager.releaseLocalStateForAllocationId(allocationID);
        checkRootDirsClean(fileArr);
        File subtaskSpecificCheckpointDirectory2 = ((LocalRecoveryDirectoryProvider) taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, new AllocationID(), jobVertexID, 23, new Configuration(), new Configuration()).getLocalRecoveryConfig().getLocalStateDirectoryProvider().orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled())).subtaskSpecificCheckpointDirectory(23L);
        Assertions.assertThat(subtaskSpecificCheckpointDirectory2.mkdirs()).isTrue();
        Assertions.assertThat(new File(subtaskSpecificCheckpointDirectory2, "test").createNewFile()).isTrue();
        taskExecutorLocalStateStoresManager.shutdown();
        checkRootDirsClean(fileArr);
    }

    @Test
    void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {
        File[] fileArr = {TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath())};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(fileArr), Executors.directExecutor());
        for (File file : fileArr) {
            Assertions.assertThat(file).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File file2 : fileArr) {
            Assertions.assertThat(file2).doesNotExist();
        }
    }

    @Test
    void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOException {
        File[] fileArr = {TempDirUtils.newFolder(temporaryFolder.toPath()), TempDirUtils.newFolder(temporaryFolder.toPath())};
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.borrowed(fileArr), Executors.directExecutor());
        for (File file : fileArr) {
            Assertions.assertThat(file).exists();
        }
        taskExecutorLocalStateStoresManager.shutdown();
        for (File file2 : fileArr) {
            Assertions.assertThat(file2).exists();
        }
    }

    @Test
    void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories() throws IOException {
        File newFolder = TempDirUtils.newFolder(temporaryFolder.toPath());
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(true, Reference.owned(new File[]{newFolder}), Executors.directExecutor());
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        JobVertexID jobVertexID = new JobVertexID();
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, 0, new Configuration(), new Configuration());
        taskExecutorLocalStateStoresManager.localStateStoreForSubtask(jobID, allocationID2, jobVertexID, 1, new Configuration(), new Configuration());
        Assertions.assertThat(TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(newFolder)).hasSize(2);
        taskExecutorLocalStateStoresManager.retainLocalStateForAllocations(Sets.newHashSet(new AllocationID[]{allocationID}));
        Assertions.assertThat(TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(newFolder)).hasSize(1);
        Assertions.assertThat(new File(newFolder, taskExecutorLocalStateStoresManager.allocationSubDirString(allocationID2))).doesNotExist();
    }

    private void checkRootDirsClean(File[] fileArr) {
        for (File file : fileArr) {
            File[] listFiles = file.listFiles();
            if (listFiles != null) {
                Assertions.assertThat(listFiles).isEmpty();
            }
        }
    }

    private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration(Configuration configuration, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServicesConfiguration.fromConfiguration(configuration, ResourceID.generate(), InetAddress.getLocalHost().getHostName(), true, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), workingDirectory);
    }

    private TaskManagerServices createTaskManagerServices(TaskManagerServicesConfiguration taskManagerServicesConfiguration, WorkingDirectory workingDirectory) throws Exception {
        return TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, VoidPermanentBlobService.INSTANCE, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), Executors.newDirectExecutorService(), th -> {
        }, workingDirectory);
    }
}
