package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartitionInfo;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.RemoteAddress;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest.class */
public class TaskManagerTest {
    private static ActorSystem system;
    private static Timeout timeout = new Timeout(1, TimeUnit.MINUTES);

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleJobManager.class */
    public static class SimpleJobManager extends UntypedActor {
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof RegistrationMessages.RegisterTaskManager) {
                getSender().tell(new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), -1), getSelf());
            } else if (obj instanceof JobManagerMessages.UpdateTaskExecutionState) {
                getSender().tell(true, getSelf());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManager.class */
    public static class SimpleLookupFailingUpdateJobManager extends SimpleLookupJobManager {
        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleLookupJobManager, org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.UpdateTaskExecutionState) {
                getSender().tell(false, getSelf());
            } else {
                super.onReceive(obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupFailingUpdateJobManagerCreator.class */
    public static class SimpleLookupFailingUpdateJobManagerCreator implements Creator<SimpleLookupFailingUpdateJobManager> {
        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public SimpleLookupFailingUpdateJobManager m143create() throws Exception {
            return new SimpleLookupFailingUpdateJobManager();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManager.class */
    public static class SimpleLookupJobManager extends SimpleJobManager {
        @Override // org.apache.flink.runtime.taskmanager.TaskManagerTest.SimpleJobManager
        public void onReceive(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.ScheduleOrUpdateConsumers) {
                getSender().tell(new JobManagerMessages.ConsumerNotificationResult(true, Option.apply((Object) null)), getSelf());
            } else {
                super.onReceive(obj);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$SimpleLookupJobManagerCreator.class */
    public static class SimpleLookupJobManagerCreator implements Creator<SimpleLookupJobManager> {
        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public SimpleLookupJobManager m144create() throws Exception {
            return new SimpleLookupJobManager();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableBlockingCancelable.class */
    public static final class TestInvokableBlockingCancelable extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            Object obj = new Object();
            synchronized (obj) {
                obj.wait();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerTest$TestInvokableCorrect.class */
    public static final class TestInvokableCorrect extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
        }
    }

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestActorSystem", TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testSetupTaskManager() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1
            {
                try {
                    final ActorRef createTaskManager = TaskManagerTest.createTaskManager(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[0])));
                    JobID jobID = new JobID();
                    JobVertexID jobVertexID = new JobVertexID();
                    final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                    final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask", 2, 7, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                    new JavaTestKit.Within(duration("1 seconds")) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.1.1
                        protected void run() {
                            createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                        }
                    };
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testJobSubmissionAndCanceling() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2
            {
                try {
                    final ActorRef createTaskManager = TaskManagerTest.createTaskManager(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[0])));
                    JobID jobID = new JobID();
                    JobID jobID2 = new JobID();
                    JobVertexID jobVertexID = new JobVertexID();
                    JobVertexID jobVertexID2 = new JobVertexID();
                    final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                    final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                    final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "TestTask1", 1, 5, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                    final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID2, jobVertexID2, executionAttemptID2, "TestTask2", 2, 7, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                    final FiniteDuration duration = duration("1 second");
                    new JavaTestKit.Within(duration) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.2.1
                        protected void run() {
                            try {
                                createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                                expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID2, true));
                                createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                                Assert.assertEquals(2L, asJava.size());
                                Task task = asJava.get(executionAttemptID);
                                Task task2 = asJava.get(executionAttemptID2);
                                Assert.assertNotNull(task);
                                Assert.assertNotNull(task2);
                                Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
                                Assert.assertEquals(ExecutionState.RUNNING, task2.getExecutionState());
                                createTaskManager.tell(new TaskManagerMessages.CancelTask(executionAttemptID), getRef());
                                expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), duration);
                                Assert.assertEquals(ExecutionState.CANCELED, task.getExecutionState());
                                createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                Assert.assertEquals(1L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                                createTaskManager.tell(new TaskManagerMessages.CancelTask(executionAttemptID), getRef());
                                expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, false, "No task with that execution ID was found."));
                                createTaskManager.tell(new TaskManagerMessages.CancelTask(executionAttemptID2), getRef());
                                expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID2, true));
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), duration);
                                Assert.assertEquals(ExecutionState.CANCELED, task2.getExecutionState());
                                createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testGateChannelEdgeMismatch() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3
            {
                try {
                    final ActorRef createTaskManager = TaskManagerTest.createTaskManager(TaskManagerTest.system.actorOf(Props.create(SimpleJobManager.class, new Object[0])));
                    JobID jobID = new JobID();
                    JobVertexID jobVertexID = new JobVertexID();
                    JobVertexID jobVertexID2 = new JobVertexID();
                    final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                    final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                    final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                    final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), 0);
                    new JavaTestKit.Within(duration("1 second")) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.3.1
                        protected void run() {
                            try {
                                createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                                createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                                TaskManagerMessages.TaskOperationResult taskOperationResult = (TaskManagerMessages.TaskOperationResult) expectMsgClass(TaskManagerMessages.TaskOperationResult.class);
                                Assert.assertFalse(taskOperationResult.success());
                                Assert.assertEquals(executionAttemptID, taskOperationResult.executionID());
                                TaskManagerMessages.TaskOperationResult taskOperationResult2 = (TaskManagerMessages.TaskOperationResult) expectMsgClass(TaskManagerMessages.TaskOperationResult.class);
                                Assert.assertFalse(taskOperationResult2.success());
                                Assert.assertEquals(executionAttemptID2, taskOperationResult2.executionID());
                                createTaskManager.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), getRef());
                                createTaskManager.tell(new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), getRef());
                                expectMsgEquals(true);
                                expectMsgEquals(true);
                                createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                                Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                            }
                        }
                    };
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testRunJobWithForwardChannel() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4
            {
                JobID jobID = new JobID();
                JobVertexID jobVertexID = new JobVertexID();
                JobVertexID jobVertexID2 = new JobVertexID();
                final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                final ActorRef createTaskManager = TaskManagerTest.createTaskManager(TaskManagerTest.system.actorOf(Props.create(new SimpleLookupJobManagerCreator())));
                IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                ArrayList arrayList = new ArrayList();
                arrayList.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, IntermediateResultPartitionType.PIPELINED, 1));
                PartitionConsumerDeploymentDescriptor partitionConsumerDeploymentDescriptor = new PartitionConsumerDeploymentDescriptor(new IntermediateDataSetID(), new PartitionInfo[]{new PartitionInfo(intermediateResultPartitionID, executionAttemptID, PartitionInfo.PartitionLocation.LOCAL, (RemoteAddress) null)}, 0);
                final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), 0);
                final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(partitionConsumerDeploymentDescriptor), new ArrayList(), 0);
                final FiniteDuration duration = duration("1 second");
                new JavaTestKit.Within(duration) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.4.1
                    protected void run() {
                        try {
                            createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID2, true));
                            createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                            createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                            Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                            Task task = asJava.get(executionAttemptID);
                            Task task2 = asJava.get(executionAttemptID2);
                            if (task != null) {
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), duration);
                            }
                            if (task2 != null) {
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), duration);
                                Assert.assertEquals(ExecutionState.FINISHED, task2.getExecutionState());
                            }
                            createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                            Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                        } catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail(e.getMessage());
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testCancellingDependentAndStateUpdateFails() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5
            {
                JobID jobID = new JobID();
                JobVertexID jobVertexID = new JobVertexID();
                JobVertexID jobVertexID2 = new JobVertexID();
                final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                final ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
                final ActorRef createTaskManager = TaskManagerTest.createTaskManager(TaskManagerTest.system.actorOf(Props.create(new SimpleLookupFailingUpdateJobManagerCreator())));
                IntermediateResultPartitionID intermediateResultPartitionID = new IntermediateResultPartitionID();
                ArrayList arrayList = new ArrayList();
                arrayList.add(new PartitionDeploymentDescriptor(new IntermediateDataSetID(), intermediateResultPartitionID, IntermediateResultPartitionType.PIPELINED, 1));
                PartitionConsumerDeploymentDescriptor partitionConsumerDeploymentDescriptor = new PartitionConsumerDeploymentDescriptor(new IntermediateDataSetID(), new PartitionInfo[]{new PartitionInfo(intermediateResultPartitionID, executionAttemptID, PartitionInfo.PartitionLocation.LOCAL, (RemoteAddress) null)}, 0);
                final TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(jobID, jobVertexID, executionAttemptID, "Sender", 0, 1, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), arrayList, Collections.emptyList(), new ArrayList(), 0);
                final TaskDeploymentDescriptor taskDeploymentDescriptor2 = new TaskDeploymentDescriptor(jobID, jobVertexID2, executionAttemptID2, "Receiver", 2, 7, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(partitionConsumerDeploymentDescriptor), new ArrayList(), 0);
                final FiniteDuration duration = duration("1 second");
                new JavaTestKit.Within(duration) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerTest.5.1
                    protected void run() {
                        try {
                            createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor2), getRef());
                            createTaskManager.tell(new TaskManagerMessages.SubmitTask(taskDeploymentDescriptor), getRef());
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID2, true));
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                            createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                            Map<ExecutionAttemptID, Task> asJava = ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava();
                            Task task = asJava.get(executionAttemptID);
                            Task task2 = asJava.get(executionAttemptID2);
                            createTaskManager.tell(new TaskManagerMessages.CancelTask(executionAttemptID2), getRef());
                            expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID2, true));
                            if (task2 != null) {
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID2), TaskManagerTest.timeout), duration);
                            }
                            if (task != null) {
                                if (task.getExecutionState() == ExecutionState.RUNNING) {
                                    createTaskManager.tell(new TaskManagerMessages.CancelTask(executionAttemptID), getRef());
                                    expectMsgEquals(new TaskManagerMessages.TaskOperationResult(executionAttemptID, true));
                                }
                                Await.ready(Patterns.ask(createTaskManager, new TestingTaskManagerMessages.NotifyWhenTaskRemoved(executionAttemptID), TaskManagerTest.timeout), duration);
                            }
                            createTaskManager.tell(TestingTaskManagerMessages.getRequestRunningTasksMessage(), getRef());
                            Assert.assertEquals(0L, ((TestingTaskManagerMessages.ResponseRunningTasks) expectMsgClass(TestingTaskManagerMessages.ResponseRunningTasks.class)).asJava().size());
                        } catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail(e.getMessage());
                        }
                    }
                };
            }
        };
    }

    public static ActorRef createTaskManager(ActorRef actorRef) {
        Configuration configuration = new Configuration();
        configuration.setInteger("taskmanager.memory.size", 10);
        GlobalConfiguration.includeConfiguration(configuration);
        configuration.setString("jobmanager.akka.url", actorRef.path().toString());
        ActorRef startTestingTaskManagerWithConfiguration = TestingUtils.startTestingTaskManagerWithConfiguration("localhost", configuration, system);
        try {
            Await.ready(Patterns.ask(startTestingTaskManagerWithConfiguration, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout), new FiniteDuration(20L, TimeUnit.SECONDS));
            return startTestingTaskManagerWithConfiguration;
        } catch (Exception e) {
            throw new RuntimeException("Exception while waiting for the task manager registration.", e);
        }
    }
}
