package org.apache.flink.test.recovery;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase.class */
public class TaskManagerDisconnectOnShutdownITCase {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerDisconnectOnShutdownITCase.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase$TaskManagerConnectionTracker.class */
    public static class TaskManagerConnectionTracker {
        private final CompletableFuture<Void> taskManagerConnectedFuture;
        private final CompletableFuture<Void> taskManagerDisconnectedFuture;
        private final AtomicInteger numberOfConnectedTaskManager;

        private TaskManagerConnectionTracker() {
            this.taskManagerConnectedFuture = new CompletableFuture<>();
            this.taskManagerDisconnectedFuture = new CompletableFuture<>();
            this.numberOfConnectedTaskManager = new AtomicInteger();
        }

        public void connectTaskManager() {
            this.numberOfConnectedTaskManager.incrementAndGet();
            this.taskManagerConnectedFuture.complete(null);
        }

        public void disconnectTaskManager() {
            this.taskManagerDisconnectedFuture.complete(null);
        }

        public void waitForTaskManagerConnected() throws Exception {
            this.taskManagerConnectedFuture.get();
        }

        public void waitForTaskManagerDisconnected() throws Exception {
            this.taskManagerConnectedFuture.get();
        }

        public int getNumberOfConnectedTaskManager() {
            return this.numberOfConnectedTaskManager.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerDisconnectOnShutdownITCase$TestingStandaloneResourceManagerFactory.class */
    public static class TestingStandaloneResourceManagerFactory extends ResourceManagerFactory<ResourceID> {
        TaskManagerConnectionTracker tracker;

        public TestingStandaloneResourceManagerFactory(TaskManagerConnectionTracker taskManagerConnectionTracker) {
            this.tracker = taskManagerConnectionTracker;
        }

        protected ResourceManager<ResourceID> createResourceManager(Configuration configuration, ResourceID resourceID, RpcService rpcService, UUID uuid, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String str, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices, Executor executor) {
            return new StandaloneResourceManager(rpcService, uuid, resourceID, heartbeatServices, delegationTokenManager, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration), Time.fromDuration((Duration) configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)), executor) { // from class: org.apache.flink.test.recovery.TaskManagerDisconnectOnShutdownITCase.TestingStandaloneResourceManagerFactory.1
                public void disconnectTaskManager(ResourceID resourceID2, Exception exc) {
                    TestingStandaloneResourceManagerFactory.this.tracker.disconnectTaskManager();
                    super.disconnectTaskManager(resourceID2, exc);
                }

                public CompletableFuture<Acknowledge> sendSlotReport(ResourceID resourceID2, InstanceID instanceID, SlotReport slotReport, Time time) {
                    CompletableFuture<Acknowledge> sendSlotReport = super.sendSlotReport(resourceID2, instanceID, slotReport, time);
                    TestingStandaloneResourceManagerFactory.this.tracker.connectTaskManager();
                    return sendSlotReport;
                }
            };
        }

        protected ResourceManagerRuntimeServicesConfiguration createResourceManagerRuntimeServicesConfiguration(Configuration configuration) throws ConfigurationException {
            return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(StandaloneResourceManagerFactory.getConfigurationWithoutMaxSlotNumberIfSet(configuration), ArbitraryWorkerResourceSpecFactory.INSTANCE);
        }
    }

    @Test
    public void testTaskManagerProcessFailure() {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, -1);
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
        configuration.set(TaskManagerOptions.CPU_CORES, Double.valueOf(1.0d));
        configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofSeconds(30L));
        if (CommonTestUtils.getJavaCommandPath() == null) {
            Assert.fail("cannot find java executable");
        }
        final TaskManagerConnectionTracker taskManagerConnectionTracker = new TaskManagerConnectionTracker();
        TestProcessBuilder.TestProcess testProcess = null;
        try {
            try {
                SessionClusterEntrypoint sessionClusterEntrypoint = new SessionClusterEntrypoint(configuration) { // from class: org.apache.flink.test.recovery.TaskManagerDisconnectOnShutdownITCase.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    /* renamed from: createDispatcherResourceManagerComponentFactory, reason: merged with bridge method [inline-methods] */
                    public DefaultDispatcherResourceManagerComponentFactory m909createDispatcherResourceManagerComponentFactory(Configuration configuration2) {
                        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(new TestingStandaloneResourceManagerFactory(taskManagerConnectionTracker));
                    }
                };
                Throwable th = null;
                try {
                    try {
                        sessionClusterEntrypoint.startCluster();
                        TestProcessBuilder testProcessBuilder = new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
                        testProcessBuilder.addConfigAsMainClassArgs(configuration);
                        TestProcessBuilder.TestProcess start = testProcessBuilder.start();
                        taskManagerConnectionTracker.waitForTaskManagerConnected();
                        start.destroy();
                        taskManagerConnectionTracker.waitForTaskManagerDisconnected();
                        Assertions.assertThat(taskManagerConnectionTracker.getNumberOfConnectedTaskManager()).isEqualTo(1);
                        if (sessionClusterEntrypoint != null) {
                            if (0 != 0) {
                                try {
                                    sessionClusterEntrypoint.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                sessionClusterEntrypoint.close();
                            }
                        }
                        if (start == null || !start.getProcess().isAlive()) {
                            return;
                        }
                        LOG.error("TaskManager did not shutdown in time.");
                        printProcessLog(start);
                        start.getProcess().destroyForcibly();
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (sessionClusterEntrypoint != null) {
                        if (th != null) {
                            try {
                                sessionClusterEntrypoint.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            sessionClusterEntrypoint.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                printProcessLog(null);
                Assert.fail(th6.getMessage());
                if (0 == 0 || !testProcess.getProcess().isAlive()) {
                    return;
                }
                LOG.error("TaskManager did not shutdown in time.");
                printProcessLog(null);
                testProcess.getProcess().destroyForcibly();
            }
        } catch (Throwable th7) {
            if (0 != 0 && testProcess.getProcess().isAlive()) {
                LOG.error("TaskManager did not shutdown in time.");
                printProcessLog(null);
                testProcess.getProcess().destroyForcibly();
            }
            throw th7;
        }
    }

    protected static void printProcessLog(TestProcessBuilder.TestProcess testProcess) {
        if (testProcess == null) {
            System.out.println("-----------------------------------------");
            System.out.println(" TaskManager WAS NOT STARTED.");
            System.out.println("-----------------------------------------");
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR TaskManager");
        System.out.println("-----------------------------------------");
        System.out.println(testProcess.getErrorOutput().toString());
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}
