/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.JobLeaderService;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;

public class TaskExecutorTest
extends TestLogger {
    @Rule
    public TestName name = new TestName();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        JobID jobId = new JobID();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID tmResourceId = new ResourceID("tm");
        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class)), (TimerService)Mockito.mock(TimerService.class));
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
        haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        long heartbeatTimeout = 10L;
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<Void, Void>>(){

            public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
                return new HeartbeatManagerImpl(10L, taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]);
            }
        });
        String jobMasterAddress = "jm";
        UUID jmLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = new ResourceID("jm");
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        int blobPort = 42;
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (UUID)Mockito.eq((Object)jmLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getAddress()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"localhost");
        try {
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, tmConfig, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, heartbeatServices, (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, new JobManagerTable(), jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
            jobLeaderService.addJob(jobId, "jm");
            jmLeaderRetrievalService.notifyListener("jm", jmLeaderId);
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway)).registerTaskManager((String)Mockito.eq((Object)taskManager.getAddress()), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (UUID)Mockito.eq((Object)jmLeaderId), (Time)Mockito.any(Time.class));
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway, (VerificationMode)Mockito.timeout((long)500L))).disconnectTaskManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (Exception)Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String rmAddress = "rm";
        String tmAddress = "tm";
        ResourceID rmResourceId = new ResourceID("rm");
        ResourceID tmResourceId = new ResourceID("tm");
        UUID rmLeaderId = UUID.randomUUID();
        ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        Mockito.when((Object)rmGateway.registerTaskExecutor((UUID)Mockito.any(UUID.class), Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        rpc.registerGateway("rm", (RpcGateway)rmGateway);
        TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(null, null);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        haServices.setResourceManagerLeaderRetriever(testLeaderService);
        TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when((Object)taskManagerConfiguration.getNumberSlots()).thenReturn((Object)1);
        final TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
        Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)tmResourceId);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        SlotReport slotReport = new SlotReport();
        Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        long heartbeatTimeout = 10L;
        HeartbeatServices heartbeatServices = (HeartbeatServices)Mockito.mock(HeartbeatServices.class);
        Mockito.when((Object)heartbeatServices.createHeartbeatManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (HeartbeatListener)Mockito.any(HeartbeatListener.class), (ScheduledExecutor)Mockito.any(ScheduledExecutor.class), (Logger)Mockito.any(Logger.class))).thenAnswer((Answer)new Answer<HeartbeatManagerImpl<Void, Void>>(){

            public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
                return new HeartbeatManagerImpl(10L, taskManagerLocation.getResourceID(), (HeartbeatListener)invocation.getArguments()[1], (Executor)invocation.getArguments()[2], (ScheduledExecutor)invocation.getArguments()[2], (Logger)invocation.getArguments()[3]);
            }
        });
        try {
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, heartbeatServices, (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            testLeaderService.notifyListener("rm", rmLeaderId);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.atLeast((int)1))).registerTaskExecutor((UUID)Mockito.eq((Object)rmLeaderId), (String)Mockito.eq((Object)taskManager.getAddress()), (ResourceID)Mockito.eq((Object)tmResourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway, (VerificationMode)Mockito.timeout((long)500L))).disconnectTaskManager((ResourceID)Mockito.eq((Object)taskManagerLocation.getResourceID()), (Exception)Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        ResourceID resourceID = ResourceID.generate();
        String resourceManagerAddress = "/resource/manager/address/one";
        ResourceID resourceManagerResourceId = new ResourceID("/resource/manager/address/one");
        String jobManagerAddress = "localhost";
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        try {
            ResourceManagerGateway rmGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
            Mockito.when((Object)rmGateway.registerTaskExecutor((UUID)Mockito.any(UUID.class), Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(new InstanceID(), resourceManagerResourceId, 10L)));
            TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
            rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
            Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)resourceID);
            StandaloneHaServices haServices = new StandaloneHaServices("/resource/manager/address/one", "localhost");
            TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
            SlotReport slotReport = new SlotReport();
            Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway)).registerTaskExecutor((UUID)Mockito.any(UUID.class), (String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.eq((Object)slotReport), (Time)Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        ResourceID tmResourceID = ResourceID.generate();
        String address1 = "/resource/manager/address/one";
        String address2 = "/resource/manager/address/two";
        UUID leaderId1 = UUID.randomUUID();
        UUID leaderId2 = UUID.randomUUID();
        ResourceID rmResourceId1 = new ResourceID("/resource/manager/address/one");
        ResourceID rmResourceId2 = new ResourceID("/resource/manager/address/two");
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        try {
            ResourceManagerGateway rmGateway1 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
            ResourceManagerGateway rmGateway2 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
            Mockito.when((Object)rmGateway1.registerTaskExecutor((UUID)Mockito.any(UUID.class), Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
            Mockito.when((Object)rmGateway2.registerTaskExecutor((UUID)Mockito.any(UUID.class), Mockito.anyString(), (ResourceID)Mockito.any(ResourceID.class), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
            rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway1);
            rpc.registerGateway("/resource/manager/address/two", (RpcGateway)rmGateway2);
            TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(null, null);
            TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
            haServices.setResourceManagerLeaderRetriever(testLeaderService);
            TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
            Mockito.when((Object)taskManagerServicesConfiguration.getConfiguration()).thenReturn((Object)new Configuration());
            Mockito.when((Object)taskManagerServicesConfiguration.getTmpDirectories()).thenReturn((Object)new String[1]);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
            Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)tmResourceID);
            Mockito.when((Object)taskManagerLocation.getHostname()).thenReturn((Object)"foobar");
            TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
            SlotReport slotReport = new SlotReport();
            Mockito.when((Object)taskSlotTable.createSlotReport((ResourceID)Mockito.any(ResourceID.class))).thenReturn((Object)slotReport);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener("/resource/manager/address/one", leaderId1);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1)).registerTaskExecutor((UUID)Mockito.eq((Object)leaderId1), (String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)tmResourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener(null, null);
            testLeaderService.notifyListener("/resource/manager/address/two", leaderId2);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway2)).registerTaskExecutor((UUID)Mockito.eq((Object)leaderId2), (String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)tmResourceID), (SlotReport)Mockito.eq((Object)slotReport), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=1000L)
    public void testTaskSubmission() throws Exception {
        Configuration configuration = new Configuration();
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        UUID jobManagerLeaderId = UUID.randomUUID();
        JobVertexID jobVertexId = new JobVertexID();
        JobInformation jobInformation = new JobInformation(jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, TestInvokable.class.getName(), new Configuration());
        SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
        SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
        TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(serializedJobInformation, serializedJobVertexInformation, new ExecutionAttemptID(), allocationId, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libraryCacheManager.getClassLoader((JobID)Mockito.eq((Object)jobId))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        JobManagerConnection jobManagerConnection = new JobManagerConnection(jobId, ResourceID.generate(), (JobMasterGateway)Mockito.mock(JobMasterGateway.class), jobManagerLeaderId, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(jobId, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable)Mockito.mock(TaskSlotTable.class);
        Mockito.when((Object)taskSlotTable.existsActiveSlot((JobID)Mockito.eq((Object)jobId), (AllocationID)Mockito.eq((Object)allocationId))).thenReturn((Object)true);
        Mockito.when((Object)taskSlotTable.addTask((Task)Mockito.any(Task.class))).thenReturn((Object)true);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)networkEnvironment.createKvStateTaskRegistry((JobID)Mockito.eq((Object)jobId), (JobVertexID)Mockito.eq((Object)jobVertexId))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class);
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
        Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        Mockito.when((Object)taskManagerMetricGroup.addTaskForJob((JobID)Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID)Mockito.any(JobVertexID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn((Object)taskMetricGroup);
        HighAvailabilityServices haServices = (HighAvailabilityServices)Mockito.mock(HighAvailabilityServices.class);
        Mockito.when((Object)haServices.getResourceManagerLeaderRetriever()).thenReturn(Mockito.mock(LeaderRetrievalService.class));
        try {
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerConfiguration, (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class), (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), taskManagerMetricGroup, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            taskManager.submitTask(tdd, jobManagerLeaderId);
            CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
            completionFuture.get();
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobLeaderDetection() throws Exception {
        JobID jobId = new JobID();
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((UUID)Mockito.eq((Object)resourceManagerLeaderId), (String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (UUID)Mockito.any(UUID.class), (Time)Mockito.any(Time.class))).thenReturn(Mockito.mock(Future.class, (Answer)Mockito.RETURNS_MOCKS));
        rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        AllocationID allocationId = new AllocationID();
        SlotID slotId = new SlotID(resourceId, 0);
        SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
        try {
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            resourceManagerLeaderRetrievalService.notifyListener("rm", resourceManagerLeaderId);
            taskManager.requestSlot(slotId, jobId, allocationId, "jm", resourceManagerLeaderId);
            jobManagerLeaderRetrievalService.notifyListener("jm", jobManagerLeaderId);
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway)).offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Matchers.argThat((Matcher)org.hamcrest.Matchers.contains((Object[])new SlotOffer[]{slotOffer})), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlotAcceptance() throws Exception {
        JobID jobId = new JobID();
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService("rm", resourceManagerLeaderId);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService("jm", jobManagerLeaderId);
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((UUID)Mockito.eq((Object)resourceManagerLeaderId), (String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed(Collections.singleton(offer1)));
        rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        try {
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds((long)10000L));
            jobLeaderService.addJob(jobId, "jm");
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway)).notifySlotAvailable((UUID)Mockito.eq((Object)resourceManagerLeaderId), (InstanceID)Mockito.eq((Object)registrationId), (SlotID)Mockito.eq((Object)new SlotID(resourceId, 1)), (AllocationID)Mockito.eq((Object)allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Ignore
    @Test
    public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
        ResourceID resourceID = ResourceID.generate();
        String address1 = "/resource/manager/address/one";
        UUID leaderId = UUID.randomUUID();
        JobID jobId = new JobID();
        String jobManagerAddress = "foobar";
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        try {
            ResourceManagerGateway rmGateway1 = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
            rpc.registerGateway("/resource/manager/address/one", (RpcGateway)rmGateway1);
            TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
            TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
            haServices.setResourceManagerLeaderRetriever(testLeaderService);
            TaskManagerConfiguration taskManagerServicesConfiguration = (TaskManagerConfiguration)Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when((Object)taskManagerServicesConfiguration.getNumberSlots()).thenReturn((Object)1);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation)Mockito.mock(TaskManagerLocation.class);
            Mockito.when((Object)taskManagerLocation.getResourceID()).thenReturn((Object)resourceID);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerServicesConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), (TaskSlotTable)Mockito.mock(TaskSlotTable.class), (JobManagerTable)Mockito.mock(JobManagerTable.class), (JobLeaderService)Mockito.mock(JobLeaderService.class), (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            String taskManagerAddress = taskManager.getAddress();
            Assert.assertNull((Object)taskManager.getResourceManagerConnection());
            testLeaderService.notifyListener("/resource/manager/address/one", leaderId);
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1)).registerTaskExecutor((UUID)Mockito.eq((Object)leaderId), (String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            Assert.assertNotNull((Object)taskManager.getResourceManagerConnection());
            SlotID slotID = new SlotID(resourceID, 0);
            taskManager.requestSlot(slotID, jobId, new AllocationID(), "foobar", leaderId);
            SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
            try {
                taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), "foobar", leaderId);
                Assert.fail((String)"The slot request should have failed.");
            }
            catch (SlotAllocationException slotAllocationException) {
                // empty catch block
            }
            ((ResourceManagerGateway)Mockito.verify((Object)rmGateway1)).registerTaskExecutor((UUID)Mockito.eq((Object)leaderId), (String)Mockito.eq((Object)taskManagerAddress), (ResourceID)Mockito.eq((Object)resourceID), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class));
            testLeaderService.notifyListener("/resource/manager/address/one", leaderId);
            taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), "foobar", leaderId);
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        JobID jobId = new JobID();
        TestingSerialRpcService rpc = new TestingSerialRpcService();
        Configuration configuration = new Configuration();
        TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration((Configuration)configuration);
        ResourceID resourceId = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
        TimerService timerService = (TimerService)Mockito.mock(TimerService.class);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile)Mockito.mock(ResourceProfile.class), (ResourceProfile)Mockito.mock(ResourceProfile.class)), timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        String resourceManagerAddress = "rm";
        UUID resourceManagerLeaderId = UUID.randomUUID();
        ResourceID resourceManagerResourceId = new ResourceID("rm");
        String jobManagerAddress = "jm";
        UUID jobManagerLeaderId = UUID.randomUUID();
        TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService("rm", resourceManagerLeaderId);
        TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService("jm", jobManagerLeaderId);
        haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
        haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)Mockito.mock(ResourceManagerGateway.class);
        InstanceID registrationId = new InstanceID();
        Mockito.when((Object)resourceManagerGateway.registerTaskExecutor((UUID)Mockito.eq((Object)resourceManagerLeaderId), (String)Mockito.any(String.class), (ResourceID)Mockito.eq((Object)resourceId), (SlotReport)Mockito.any(SlotReport.class), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
        ResourceID jmResourceId = new ResourceID("jm");
        int blobPort = 42;
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway)Mockito.mock(JobMasterGateway.class);
        Mockito.when((Object)jobMasterGateway.registerTaskManager((String)Mockito.any(String.class), (TaskManagerLocation)Mockito.eq((Object)taskManagerLocation), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)FlinkCompletableFuture.completed((Object)new JMTMRegistrationSuccess(jmResourceId, 42)));
        Mockito.when((Object)jobMasterGateway.getHostname()).thenReturn((Object)"jm");
        rpc.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        rpc.registerGateway("jm", (RpcGateway)jobMasterGateway);
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libraryCacheManager.getClassLoader((JobID)Mockito.eq((Object)jobId))).thenReturn((Object)((Object)((Object)this)).getClass().getClassLoader());
        JobManagerConnection jobManagerConnection = new JobManagerConnection(jobId, jmResourceId, jobMasterGateway, jobManagerLeaderId, (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class));
        jobManagerTable.put(jobId, jobManagerConnection);
        try {
            TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup)Mockito.mock(TaskManagerMetricGroup.class);
            TaskMetricGroup taskMetricGroup = (TaskMetricGroup)Mockito.mock(TaskMetricGroup.class);
            Mockito.when((Object)taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
            Mockito.when((Object)taskManagerMetricGroup.addTaskForJob((JobID)Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID)Mockito.any(JobVertexID.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn((Object)taskMetricGroup);
            TaskExecutor taskManager = new TaskExecutor((RpcService)rpc, taskManagerConfiguration, taskManagerLocation, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class), (HighAvailabilityServices)haServices, (HeartbeatServices)Mockito.mock(HeartbeatServices.class, (Answer)Mockito.RETURNS_MOCKS), (MetricRegistry)Mockito.mock(MetricRegistry.class), taskManagerMetricGroup, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), (FileCache)Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, (FatalErrorHandler)testingFatalErrorHandler);
            taskManager.start();
            taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds((long)10000L));
            taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds((long)10000L));
            JobVertexID jobVertexId = new JobVertexID();
            JobInformation jobInformation = new JobInformation(jobId, this.name.getMethodName(), new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
            TaskInformation taskInformation = new TaskInformation(jobVertexId, "test task", 1, 1, TestInvokable.class.getName(), new Configuration());
            SerializedValue serializedJobInformation = new SerializedValue((Object)jobInformation);
            SerializedValue serializedJobVertexInformation = new SerializedValue((Object)taskInformation);
            TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(serializedJobInformation, serializedJobVertexInformation, new ExecutionAttemptID(), allocationId1, 0, 0, 0, null, Collections.emptyList(), Collections.emptyList());
            FlinkCompletableFuture offerResultFuture = new FlinkCompletableFuture();
            Mockito.when((Object)jobMasterGateway.offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class))).thenReturn((Object)offerResultFuture);
            jobLeaderService.addJob(jobId, "jm");
            ((JobMasterGateway)Mockito.verify((Object)jobMasterGateway)).offerSlots((ResourceID)Mockito.any(ResourceID.class), (Iterable)Mockito.any(Iterable.class), (UUID)Mockito.eq((Object)jobManagerLeaderId), (Time)Mockito.any(Time.class));
            taskManager.submitTask(tdd, jobManagerLeaderId);
            offerResultFuture.complete(Collections.singleton(offer1));
            ((ResourceManagerGateway)Mockito.verify((Object)resourceManagerGateway)).notifySlotAvailable((UUID)Mockito.eq((Object)resourceManagerLeaderId), (InstanceID)Mockito.eq((Object)registrationId), (SlotID)Mockito.eq((Object)new SlotID(resourceId, 1)), (AllocationID)Mockito.any(AllocationID.class));
            Assert.assertTrue((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId1));
            Assert.assertFalse((boolean)taskSlotTable.existsActiveSlot(jobId, allocationId2));
            Assert.assertTrue((boolean)taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
        }
        finally {
            rpc.stopService();
        }
    }

    public static class TestInvokable
    extends AbstractInvokable {
        static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture();

        public void invoke() throws Exception {
            completableFuture.complete((Object)true);
        }
    }
}

