package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.TaskManagerOptionsInternal;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.class */
public class TaskManagerRunnerTest extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Rule
    public final Timeout timeout = Timeout.seconds(30);
    private TaskManagerRunner taskManagerRunner;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest$TestingFailingTaskExecutorServiceFactory.class */
    private static class TestingFailingTaskExecutorServiceFactory implements TaskManagerRunner.TaskExecutorServiceFactory {
        private TestingFailingTaskExecutorServiceFactory() {
        }

        public TaskManagerRunner.TaskExecutorService createTaskExecutor(Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean z, ExternalResourceInfoProvider externalResourceInfoProvider, WorkingDirectory workingDirectory, FatalErrorHandler fatalErrorHandler) {
            return TestingTaskExecutorService.newBuilder().setStartRunnable(() -> {
                fatalErrorHandler.onFatalError(new FlinkException("Cannot instantiate the TaskExecutorService."));
            }).build();
        }
    }

    @After
    public void after() throws Exception {
        if (this.taskManagerRunner != null) {
            this.taskManagerRunner.close();
        }
    }

    @Test
    public void testShouldShutdownOnFatalError() throws Exception {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("42 h"));
        this.taskManagerRunner = createTaskManagerRunner(createConfiguration);
        this.taskManagerRunner.onFatalError(new RuntimeException());
        Assert.assertThat(this.taskManagerRunner.getTerminationFuture().join(), Matchers.is(Matchers.equalTo(TaskManagerRunner.Result.FAILURE)));
    }

    @Test
    public void testShouldShutdownIfRegistrationWithJobManagerFails() throws Exception {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("10 ms"));
        this.taskManagerRunner = createTaskManagerRunner(createConfiguration);
        Assert.assertThat(this.taskManagerRunner.getTerminationFuture().join(), Matchers.is(Matchers.equalTo(TaskManagerRunner.Result.FAILURE)));
    }

    @Test
    public void testGenerateTaskManagerResourceIDWithMetaData() throws Exception {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, "test");
        Assert.assertThat(((ResourceID) TaskManagerRunner.getTaskManagerResourceID(createConfiguration, "", -1).unwrap()).getMetadata(), Matchers.equalTo("test"));
    }

    @Test
    public void testGenerateTaskManagerResourceIDWithoutMetaData() throws Exception {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "test");
        ResourceID resourceID = (ResourceID) TaskManagerRunner.getTaskManagerResourceID(createConfiguration, "", -1).unwrap();
        Assert.assertThat(resourceID.getMetadata(), Matchers.equalTo(""));
        Assert.assertThat(resourceID.getStringWithMetadata(), Matchers.equalTo("test"));
    }

    @Test
    public void testGenerateTaskManagerResourceIDWithConfig() throws Exception {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "test");
        Assert.assertThat(((ResourceID) TaskManagerRunner.getTaskManagerResourceID(createConfiguration, "", -1).unwrap()).getResourceIdString(), Matchers.equalTo("test"));
    }

    @Test
    public void testGenerateTaskManagerResourceIDWithRemoteRpcService() throws Exception {
        ResourceID resourceID = (ResourceID) TaskManagerRunner.getTaskManagerResourceID(createConfiguration(), "flink", 9090).unwrap();
        Assert.assertThat(resourceID, Matchers.notNullValue());
        Assert.assertThat(resourceID.getResourceIdString(), Matchers.containsString("flink:9090"));
    }

    @Test
    public void testGenerateTaskManagerResourceIDWithLocalRpcService() throws Exception {
        ResourceID resourceID = (ResourceID) TaskManagerRunner.getTaskManagerResourceID(createConfiguration(), "", -1).unwrap();
        Assert.assertThat(resourceID, Matchers.notNullValue());
        Assert.assertThat(resourceID.getResourceIdString(), Matchers.containsString(InetAddress.getLocalHost().getHostName()));
    }

    @Test
    public void testUnexpectedTaskManagerTerminationFailsRunnerFatally() throws Exception {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        TaskManagerRunner createTaskManagerRunner = createTaskManagerRunner(createConfiguration(), createTaskExecutorServiceFactory(TestingTaskExecutorService.newBuilder().setTerminationFuture(completableFuture).build()));
        completableFuture.complete(null);
        Assert.assertThat(createTaskManagerRunner.getTerminationFuture().join(), Matchers.is(Matchers.equalTo(TaskManagerRunner.Result.FAILURE)));
    }

    @Test
    public void testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored() throws Exception {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        TaskManagerRunner createTaskManagerRunner = createTaskManagerRunner(createConfiguration(), createTaskExecutorServiceFactory(TestingTaskExecutorService.newBuilder().setTerminationFuture(completableFuture).withManualTerminationFutureCompletion().build()));
        createTaskManagerRunner.closeAsync();
        completableFuture.complete(null);
        Assert.assertThat(createTaskManagerRunner.getTerminationFuture().join(), Matchers.is(Matchers.equalTo(TaskManagerRunner.Result.SUCCESS)));
    }

    @Test
    public void testWorkingDirIsSetupWhenStartingTaskManagerRunner() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        ResourceID resourceID = new ResourceID("foobar");
        Configuration createConfigurationWithWorkingDirectory = createConfigurationWithWorkingDirectory(newFolder, resourceID);
        File generateTaskManagerWorkingDirectoryFile = ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(createConfigurationWithWorkingDirectory, resourceID);
        TaskManagerRunner createTaskManagerRunner = createTaskManagerRunner(createConfigurationWithWorkingDirectory);
        try {
            Assert.assertTrue(generateTaskManagerWorkingDirectoryFile.exists());
            createTaskManagerRunner.close();
            Assert.assertFalse("The working dir should be cleaned up when stopping the TaskManager process gracefully.", generateTaskManagerWorkingDirectoryFile.exists());
        } catch (Throwable th) {
            createTaskManagerRunner.close();
            throw th;
        }
    }

    @Nonnull
    private Configuration createConfigurationWithWorkingDirectory(File file, ResourceID resourceID) {
        Configuration createConfiguration = createConfiguration();
        createConfiguration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, file.getAbsolutePath());
        createConfiguration.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, resourceID.toString());
        return createConfiguration;
    }

    @Test
    public void testWorkingDirIsNotDeletedInCaseOfFailure() throws Exception {
        File newFolder = TEMPORARY_FOLDER.newFolder();
        ResourceID generate = ResourceID.generate();
        Configuration createConfigurationWithWorkingDirectory = createConfigurationWithWorkingDirectory(newFolder, generate);
        createTaskManagerRunner(createConfigurationWithWorkingDirectory, new TestingFailingTaskExecutorServiceFactory()).getTerminationFuture().join();
        Assert.assertTrue(ClusterEntrypointUtils.generateTaskManagerWorkingDirectoryFile(createConfigurationWithWorkingDirectory, generate).exists());
    }

    @Nonnull
    private TaskManagerRunner.TaskExecutorServiceFactory createTaskExecutorServiceFactory(TestingTaskExecutorService testingTaskExecutorService) {
        return (configuration, resourceID, rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, blobCacheService, z, externalResourceInfoProvider, workingDirectory, fatalErrorHandler) -> {
            return testingTaskExecutorService;
        };
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setString(TaskManagerOptions.HOST, "localhost");
        return TaskExecutorResourceUtils.adjustForLocalExecution(configuration);
    }

    private static TaskManagerRunner createTaskManagerRunner(Configuration configuration) throws Exception {
        return createTaskManagerRunner(configuration, TaskManagerRunner::createTaskExecutorService);
    }

    private static TaskManagerRunner createTaskManagerRunner(Configuration configuration, TaskManagerRunner.TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
        TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration), taskExecutorServiceFactory);
        taskManagerRunner.start();
        return taskManagerRunner;
    }
}
