package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.management.jmx.JMXService;
import org.apache.flink.runtime.blob.JobPermanentBlobService;
import org.apache.flink.runtime.blob.TaskExecutorBlobService;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatReceiver;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.SharedResources;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotAllocationSnapshot;
import org.apache.flink.runtime.taskexecutor.slot.SlotAllocationSnapshotPersistenceService;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.shaded.guava31.com.google.common.collect.UnmodifiableIterator;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor.class */
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final HighAvailabilityServices haServices;
    private final TaskManagerServices taskExecutorServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final FatalErrorHandler fatalErrorHandler;
    private final TaskExecutorBlobService taskExecutorBlobService;
    private final LibraryCacheManager libraryCacheManager;

    @Nullable
    private final String metricQueryServiceAddress;
    private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;
    private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;
    private final TaskExecutorChannelStateExecutorFactoryManager channelStateExecutorFactoryManager;
    private final ExternalResourceInfoProvider externalResourceInfoProvider;
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;
    private final KvStateService kvStateService;
    private final Executor ioExecutor;
    private final SharedResources sharedResources;
    private final TaskSlotTable<Task> taskSlotTable;
    private final Map<JobID, UUID> currentSlotOfferPerJob;
    private final JobTable jobTable;
    private final JobLeaderService jobLeaderService;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService;
    private final HardwareDescription hardwareDescription;
    private final TaskExecutorMemoryConfiguration memoryConfiguration;
    private FileCache fileCache;
    private final HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
    private final TaskExecutorPartitionTracker partitionTracker;
    private final DelegationTokenReceiverRepository delegationTokenReceiverRepository;

    @Nullable
    private ResourceManagerAddress resourceManagerAddress;

    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    @Nullable
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;

    @Nullable
    private UUID currentRegistrationTimeoutId;
    private final Map<JobID, Collection<CompletableFuture<ExecutionState>>> taskResultPartitionCleanupFuturesPerJob;
    private final ThreadInfoSampleService threadInfoSampleService;
    private final ShuffleDescriptorsCache shuffleDescriptorsCache;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobLeaderListenerImpl.class */
    public final class JobLeaderListenerImpl implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerGainedLeadership(JobID jobID, JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.jobTable.getJob(jobID).ifPresent(job -> {
                    TaskExecutor.this.establishJobManagerConnection(job, jobMasterGateway, jMTMRegistrationSuccess);
                });
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerLostLeadership(JobID jobID, JobMasterId jobMasterId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", jobID, jobMasterId);
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.jobTable.getConnection(jobID).ifPresent(connection -> {
                    TaskExecutor.this.disconnectJobManagerConnection(connection, new Exception("Job leader for job id " + jobID + " lost leadership."));
                });
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void handleError(Throwable th) {
            TaskExecutor.this.onFatalError(th);
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerRejectedRegistration(JobID jobID, String str, JMTMRegistrationRejection jMTMRegistrationRejection) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.handleRejectedJobManagerConnection(jobID, str, jMTMRegistrationRejection);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobManagerHeartbeatListener.class */
    public class JobManagerHeartbeatListener implements HeartbeatListener<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload> {
        private JobManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            String format = String.format("The heartbeat of JobManager with id %s timed out.", resourceID.getStringWithMetadata());
            TaskExecutor.this.log.info(format);
            handleJobManagerConnectionLoss(resourceID, new TimeoutException(format));
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyTargetUnreachable(ResourceID resourceID) {
            String format = String.format("JobManager with id %s is no longer reachable.", resourceID.getStringWithMetadata());
            TaskExecutor.this.log.info(format);
            handleJobManagerConnectionLoss(resourceID, new TaskManagerException(format));
        }

        private void handleJobManagerConnectionLoss(ResourceID resourceID, Exception exc) {
            TaskExecutor.this.validateRunsInMainThread();
            TaskExecutor.this.jobTable.getConnection(resourceID).ifPresent(connection -> {
                TaskExecutor.this.disconnectAndTryReconnectToJobManager(connection, exc);
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
            TaskExecutor.this.validateRunsInMainThread();
            OptionalConsumer.of(TaskExecutor.this.jobTable.getConnection(allocatedSlotReport.getJobId())).ifPresent(connection -> {
                TaskExecutor.this.syncSlotsWithSnapshotFromJobMaster(connection.getJobManagerGateway(), allocatedSlotReport);
            }).ifNotPresent(() -> {
                TaskExecutor.this.log.debug("Ignoring allocated slot report from job {} because there is no active leader.", allocatedSlotReport.getJobId());
            });
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public TaskExecutorToJobManagerHeartbeatPayload retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            return (TaskExecutorToJobManagerHeartbeatPayload) TaskExecutor.this.jobTable.getConnection(resourceID).map(connection -> {
                JobID jobId = connection.getJobId();
                HashSet hashSet = new HashSet();
                ArrayList arrayList = new ArrayList(16);
                Iterator tasks = TaskExecutor.this.taskSlotTable.getTasks(jobId);
                while (tasks.hasNext()) {
                    Task task = (Task) tasks.next();
                    hashSet.add(task.getExecutionId());
                    arrayList.add(task.getAccumulatorRegistry().getSnapshot());
                }
                return new TaskExecutorToJobManagerHeartbeatPayload(new AccumulatorReport(arrayList), new ExecutionDeploymentReport(hashSet));
            }).orElseGet(TaskExecutorToJobManagerHeartbeatPayload::empty);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobManagerHeartbeatReceiver.class */
    public static final class JobManagerHeartbeatReceiver extends HeartbeatReceiver<TaskExecutorToJobManagerHeartbeatPayload> {
        private final JobMasterGateway jobMasterGateway;

        private JobManagerHeartbeatReceiver(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = jobMasterGateway;
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
        public CompletableFuture<Void> receiveHeartbeat(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload taskExecutorToJobManagerHeartbeatPayload) {
            return this.jobMasterGateway.heartbeatFromTaskManager(resourceID, taskExecutorToJobManagerHeartbeatPayload);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerHeartbeatListener.class */
    public class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            String format = String.format("The heartbeat of ResourceManager with id %s timed out.", resourceID.getStringWithMetadata());
            TaskExecutor.this.log.info(format);
            handleResourceManagerConnectionLoss(resourceID, new TaskManagerException(format));
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyTargetUnreachable(ResourceID resourceID) {
            String format = String.format("ResourceManager with id %s is no longer reachable.", resourceID.getStringWithMetadata());
            TaskExecutor.this.log.info(format);
            handleResourceManagerConnectionLoss(resourceID, new TaskManagerException(format));
        }

        private void handleResourceManagerConnectionLoss(ResourceID resourceID, TaskManagerException taskManagerException) {
            TaskExecutor.this.validateRunsInMainThread();
            if (TaskExecutor.this.establishedResourceManagerConnection == null || !TaskExecutor.this.establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceID)) {
                return;
            }
            TaskExecutor.this.reconnectToResourceManager(taskManagerException);
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public TaskExecutorHeartbeatPayload retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            return new TaskExecutorHeartbeatPayload(TaskExecutor.this.taskSlotTable.createSlotReport(TaskExecutor.this.getResourceID()), TaskExecutor.this.partitionTracker.createClusterPartitionReport());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerHeartbeatReceiver.class */
    public static final class ResourceManagerHeartbeatReceiver extends HeartbeatReceiver<TaskExecutorHeartbeatPayload> {
        private final ResourceManagerGateway resourceManagerGateway;

        private ResourceManagerHeartbeatReceiver(ResourceManagerGateway resourceManagerGateway) {
            this.resourceManagerGateway = resourceManagerGateway;
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
        public CompletableFuture<Void> receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload taskExecutorHeartbeatPayload) {
            return this.resourceManagerGateway.heartbeatFromTaskManager(resourceID, taskExecutorHeartbeatPayload);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerLeaderListener.class */
    public final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(String str, UUID uuid) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.notifyOfNewResourceManagerLeader(str, ResourceManagerId.fromUuidOrNull(uuid));
            });
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            TaskExecutor.this.onFatalError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerRegistrationListener.class */
    public final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess, TaskExecutorRegistrationRejection> {
        private ResourceManagerRegistrationListener() {
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection taskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess) {
            ResourceID resourceManagerId = taskExecutorRegistrationSuccess.getResourceManagerId();
            InstanceID registrationId = taskExecutorRegistrationSuccess.getRegistrationId();
            ClusterInformation clusterInformation = taskExecutorRegistrationSuccess.getClusterInformation();
            ResourceManagerGateway targetGateway = taskExecutorToResourceManagerConnection.getTargetGateway();
            byte[] initialTokens = taskExecutorRegistrationSuccess.getInitialTokens();
            if (initialTokens != null) {
                try {
                    TaskExecutor.this.log.info("Receive initial delegation tokens from resource manager");
                    TaskExecutor.this.delegationTokenReceiverRepository.onNewTokensObtained(initialTokens);
                } catch (Throwable th) {
                    TaskExecutor.this.log.error("Could not update delegation tokens.", th);
                    ExceptionUtils.rethrowIfFatalError(th);
                }
            }
            TaskExecutor.this.runAsync(() -> {
                if (TaskExecutor.this.resourceManagerConnection == taskExecutorToResourceManagerConnection) {
                    try {
                        TaskExecutor.this.establishResourceManagerConnection(targetGateway, resourceManagerId, registrationId, clusterInformation);
                    } catch (Throwable th2) {
                        TaskExecutor.this.log.error("Establishing Resource Manager connection in Task Executor failed", th2);
                    }
                }
            });
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationFailure(Throwable th) {
            TaskExecutor.this.onFatalError(th);
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationRejection(String str, TaskExecutorRegistrationRejection taskExecutorRegistrationRejection) {
            TaskExecutor.this.onFatalError(new FlinkException(String.format("The TaskExecutor's registration at the ResourceManager %s has been rejected: %s", str, taskExecutorRegistrationRejection)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$SlotActionsImpl.class */
    public class SlotActionsImpl implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void freeSlot(AllocationID allocationID) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.freeSlotInternal(allocationID, new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationID + '.'));
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void timeoutSlot(AllocationID allocationID, UUID uuid) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.timeoutSlot(allocationID, uuid);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$TaskExecutorJobServices.class */
    public static final class TaskExecutorJobServices implements JobTable.JobServices {
        private final LibraryCacheManager.ClassLoaderLease classLoaderLease;
        private final Runnable closeHook;

        private TaskExecutorJobServices(LibraryCacheManager.ClassLoaderLease classLoaderLease, Runnable runnable) {
            this.classLoaderLease = classLoaderLease;
            this.closeHook = runnable;
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobTable.JobServices
        public LibraryCacheManager.ClassLoaderHandle getClassLoaderHandle() {
            return this.classLoaderLease;
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobTable.JobServices
        public void close() {
            this.classLoaderLease.release();
            this.closeHook.run();
        }

        @VisibleForTesting
        static TaskExecutorJobServices create(LibraryCacheManager.ClassLoaderLease classLoaderLease, Runnable runnable) {
            return new TaskExecutorJobServices(classLoaderLease, runnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$TaskManagerActionsImpl.class */
    public final class TaskManagerActionsImpl implements TaskManagerActions {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway) Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void notifyFatalError(String str, Throwable th) {
            try {
                TaskExecutor.this.log.error(str, th);
            } catch (Throwable th2) {
            }
            TaskExecutor.this.fatalErrorHandler.onFatalError(th);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void failTask(ExecutionAttemptID executionAttemptID, Throwable th) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.failTask(executionAttemptID, th);
            });
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                TaskExecutor.this.runAsync(() -> {
                    TaskExecutor.this.unregisterTaskAndNotifyFinalState(this.jobMasterGateway, taskExecutionState.getID());
                });
            } else {
                TaskExecutor.this.updateTaskExecutionState(this.jobMasterGateway, taskExecutionState);
            }
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
            TaskExecutor.this.runAsync(() -> {
                this.jobMasterGateway.notifyEndOfData(executionAttemptID);
            });
        }
    }

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices highAvailabilityServices, TaskManagerServices taskManagerServices, ExternalResourceInfoProvider externalResourceInfoProvider, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String str, TaskExecutorBlobService taskExecutorBlobService, FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker taskExecutorPartitionTracker, DelegationTokenReceiverRepository delegationTokenReceiverRepository) {
        super(rpcService, RpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        this.currentSlotOfferPerJob = new HashMap();
        this.taskResultPartitionCleanupFuturesPerJob = CollectionUtil.newHashMapWithExpectedSize(8);
        Preconditions.checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration) Preconditions.checkNotNull(taskManagerConfiguration);
        this.taskExecutorServices = (TaskManagerServices) Preconditions.checkNotNull(taskManagerServices);
        this.haServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.partitionTracker = taskExecutorPartitionTracker;
        this.delegationTokenReceiverRepository = (DelegationTokenReceiverRepository) Preconditions.checkNotNull(delegationTokenReceiverRepository);
        this.taskManagerMetricGroup = (TaskManagerMetricGroup) Preconditions.checkNotNull(taskManagerMetricGroup);
        this.taskExecutorBlobService = (TaskExecutorBlobService) Preconditions.checkNotNull(taskExecutorBlobService);
        this.metricQueryServiceAddress = str;
        this.externalResourceInfoProvider = (ExternalResourceInfoProvider) Preconditions.checkNotNull(externalResourceInfoProvider);
        this.libraryCacheManager = taskManagerServices.getLibraryCacheManager();
        this.taskSlotTable = taskManagerServices.getTaskSlotTable();
        this.jobTable = taskManagerServices.getJobTable();
        this.jobLeaderService = taskManagerServices.getJobLeaderService();
        this.unresolvedTaskManagerLocation = taskManagerServices.getUnresolvedTaskManagerLocation();
        this.localStateStoresManager = taskManagerServices.getTaskManagerStateStore();
        this.changelogStoragesManager = taskManagerServices.getTaskManagerChangelogManager();
        this.channelStateExecutorFactoryManager = taskManagerServices.getTaskManagerChannelStateManager();
        this.shuffleEnvironment = taskManagerServices.getShuffleEnvironment();
        this.kvStateService = taskManagerServices.getKvStateService();
        this.ioExecutor = taskManagerServices.getIOExecutor();
        this.resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.hardwareDescription = HardwareDescription.extractFromSystem(taskManagerServices.getManagedMemorySize());
        this.memoryConfiguration = TaskExecutorMemoryConfiguration.create(taskManagerConfiguration.getConfiguration());
        this.resourceManagerAddress = null;
        this.resourceManagerConnection = null;
        this.currentRegistrationTimeoutId = null;
        ResourceID resourceID = taskManagerServices.getUnresolvedTaskManagerLocation().getResourceID();
        this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceID);
        this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceID);
        this.threadInfoSampleService = new ThreadInfoSampleService(Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory.Builder().setPoolName("flink-thread-info-sampler").build()));
        this.slotAllocationSnapshotPersistenceService = taskManagerServices.getSlotAllocationSnapshotPersistenceService();
        this.sharedResources = taskManagerServices.getSharedResources();
        this.shuffleDescriptorsCache = taskManagerServices.getShuffleDescriptorCache();
    }

    private HeartbeatManager<Void, TaskExecutorHeartbeatPayload> createResourceManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceID) {
        return heartbeatServices.createHeartbeatManager(resourceID, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
    }

    private HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload> createJobManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceID) {
        return heartbeatServices.createHeartbeatManager(resourceID, new JobManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Boolean> canBeReleased() {
        return CompletableFuture.completedFuture(Boolean.valueOf(this.shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty()));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Collection<LogInfo>> requestLogList(Time time) {
        return CompletableFuture.supplyAsync(() -> {
            String taskManagerLogDir = this.taskManagerConfiguration.getTaskManagerLogDir();
            if (taskManagerLogDir == null) {
                return Collections.emptyList();
            }
            File[] listFiles = new File(taskManagerLogDir).listFiles();
            if (listFiles == null) {
                throw new CompletionException((Throwable) new FlinkException(String.format("There isn't a log file in TaskExecutor’s log dir %s.", taskManagerLogDir)));
            }
            return (Collection) Arrays.stream(listFiles).filter((v0) -> {
                return v0.isFile();
            }).map(file -> {
                return new LogInfo(file.getName(), file.length(), file.lastModified());
            }).collect(Collectors.toList());
        }, this.ioExecutor);
    }

    public void onStart() throws Exception {
        try {
            startTaskExecutorServices();
            startRegistrationTimeout();
        } catch (Throwable th) {
            TaskManagerException taskManagerException = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), th);
            onFatalError(taskManagerException);
            throw taskManagerException;
        }
    }

    private void startTaskExecutorServices() throws Exception {
        try {
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            this.taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
            this.jobLeaderService.start(getAddress(), getRpcService(), this.haServices, new JobLeaderListenerImpl());
            this.fileCache = new FileCache(this.taskManagerConfiguration.getTmpDirectories(), this.taskExecutorBlobService.getPermanentBlobService());
            tryLoadLocalAllocationSnapshots();
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

    private void handleStartTaskExecutorServicesException(Exception exc) throws Exception {
        try {
            stopTaskExecutorServices();
        } catch (Exception e) {
            exc.addSuppressed(e);
        }
        throw exc;
    }

    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping TaskExecutor {}.", getAddress());
        Throwable th = null;
        FlinkExpectedException flinkExpectedException = new FlinkExpectedException("The TaskExecutor is shutting down.");
        closeResourceManagerConnection(flinkExpectedException);
        Iterator<JobTable.Job> it = this.jobTable.getJobs().iterator();
        while (it.hasNext()) {
            try {
                closeJob(it.next(), flinkExpectedException);
            } catch (Throwable th2) {
                th = ExceptionUtils.firstOrSuppressed(th2, th);
            }
        }
        this.changelogStoragesManager.shutdown();
        this.channelStateExecutorFactoryManager.shutdown();
        this.shuffleDescriptorsCache.clear();
        Preconditions.checkState(this.jobTable.isEmpty());
        Throwable th3 = th;
        return FutureUtils.runAfterwards(this.taskSlotTable.closeAsync(), this::stopTaskExecutorServices).handle((r6, th4) -> {
            handleOnStopException(th3, th4);
            return null;
        });
    }

    private void handleOnStopException(Throwable th, Throwable th2) {
        Throwable firstOrSuppressed = th != null ? ExceptionUtils.firstOrSuppressed(th, th2) : th2;
        if (firstOrSuppressed != null) {
            throw new CompletionException((Throwable) new FlinkException("Error while shutting the TaskExecutor down.", firstOrSuppressed));
        }
        this.log.info("Stopped TaskExecutor {}.", getAddress());
    }

    private void stopTaskExecutorServices() throws Exception {
        Exception exc = null;
        try {
            this.threadInfoSampleService.close();
        } catch (Exception e) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
        }
        try {
            this.jobLeaderService.stop();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        try {
            this.resourceManagerLeaderRetriever.stop();
        } catch (Exception e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
        }
        try {
            this.taskExecutorServices.shutDown();
        } catch (Exception e4) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
        }
        try {
            this.fileCache.shutdown();
        } catch (Exception e5) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e5, exc);
        }
        this.taskManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException(exc);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway
    public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(Collection<ExecutionAttemptID> collection, ThreadInfoSamplesRequest threadInfoSamplesRequest, Time time) {
        ArrayList arrayList = new ArrayList();
        for (ExecutionAttemptID executionAttemptID : collection) {
            Task task = this.taskSlotTable.getTask(executionAttemptID);
            if (task == null) {
                this.log.warn(String.format("Cannot sample task %s. Task is not known to the task manager.", executionAttemptID));
            } else {
                arrayList.add(task);
            }
        }
        return this.threadInfoSampleService.requestThreadInfoSamples((Map) arrayList.stream().collect(Collectors.toMap(task2 -> {
            return Long.valueOf(task2.getExecutingThread().getId());
        }, (v0) -> {
            return v0.getExecutionId();
        })), threadInfoSamplesRequest).thenApply(TaskThreadInfoResponse::new);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, JobMasterId jobMasterId, Time time) {
        try {
            JobID jobId = taskDeploymentDescriptor.getJobId();
            ExecutionAttemptID executionAttemptId = taskDeploymentDescriptor.getExecutionAttemptId();
            JobTable.Connection orElseThrow = this.jobTable.getConnection(jobId).orElseThrow(() -> {
                String str = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(str);
                return new TaskSubmissionException(str);
            });
            if (!Objects.equals(orElseThrow.getJobMasterId(), jobMasterId)) {
                String str = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + orElseThrow.getJobMasterId() + '.';
                this.log.debug(str);
                throw new TaskSubmissionException(str);
            }
            if (!this.taskSlotTable.tryMarkSlotActive(jobId, taskDeploymentDescriptor.getAllocationId())) {
                String str2 = "No task slot allocated for job ID " + jobId + " and allocation ID " + taskDeploymentDescriptor.getAllocationId() + '.';
                this.log.debug(str2);
                throw new TaskSubmissionException(str2);
            }
            try {
                taskDeploymentDescriptor.loadBigData(this.taskExecutorBlobService.getPermanentBlobService(), this.shuffleDescriptorsCache);
                try {
                    JobInformation jobInformation = (JobInformation) taskDeploymentDescriptor.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
                    TaskInformation taskInformation = (TaskInformation) taskDeploymentDescriptor.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
                    if (!jobId.equals(jobInformation.getJobId())) {
                        throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor (" + taskDeploymentDescriptor.getJobId() + " vs. " + jobInformation.getJobId() + ")");
                    }
                    TaskManagerJobMetricGroup addJob = this.taskManagerMetricGroup.addJob(jobInformation.getJobId(), jobInformation.getJobName());
                    TaskMetricGroup addTask = addJob.addTask(taskDeploymentDescriptor.getExecutionAttemptId(), taskInformation.getTaskName());
                    RpcInputSplitProvider rpcInputSplitProvider = new RpcInputSplitProvider(orElseThrow.getJobManagerGateway(), taskInformation.getJobVertexId(), taskDeploymentDescriptor.getExecutionAttemptId(), this.taskManagerConfiguration.getRpcTimeout());
                    RpcTaskOperatorEventGateway rpcTaskOperatorEventGateway = new RpcTaskOperatorEventGateway(orElseThrow.getJobManagerGateway(), executionAttemptId, th -> {
                        runAsync(() -> {
                            failTask(executionAttemptId, th);
                        });
                    });
                    TaskManagerActions taskManagerActions = orElseThrow.getTaskManagerActions();
                    CheckpointResponder checkpointResponder = orElseThrow.getCheckpointResponder();
                    GlobalAggregateManager globalAggregateManager = orElseThrow.getGlobalAggregateManager();
                    LibraryCacheManager.ClassLoaderHandle classLoaderHandle = orElseThrow.getClassLoaderHandle();
                    PartitionProducerStateChecker partitionStateChecker = orElseThrow.getPartitionStateChecker();
                    TaskLocalStateStore localStateStoreForSubtask = this.localStateStoresManager.localStateStoreForSubtask(jobId, taskDeploymentDescriptor.getAllocationId(), taskInformation.getJobVertexId(), taskDeploymentDescriptor.getSubtaskIndex(), this.taskManagerConfiguration.getConfiguration(), jobInformation.getJobConfiguration());
                    try {
                        try {
                            Task task = new Task(jobInformation, taskInformation, taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor.getAllocationId(), taskDeploymentDescriptor.getProducedPartitions(), taskDeploymentDescriptor.getInputGates(), this.taskSlotTable.getTaskMemoryManager(taskDeploymentDescriptor.getAllocationId()), this.sharedResources, this.taskExecutorServices.getIOManager(), this.taskExecutorServices.getShuffleEnvironment(), this.taskExecutorServices.getKvStateService(), this.taskExecutorServices.getBroadcastVariableManager(), this.taskExecutorServices.getTaskEventDispatcher(), this.externalResourceInfoProvider, new TaskStateManagerImpl(jobId, taskDeploymentDescriptor.getExecutionAttemptId(), localStateStoreForSubtask, this.changelogStoragesManager.stateChangelogStorageForJob(jobId, this.taskManagerConfiguration.getConfiguration(), addJob, localStateStoreForSubtask.getLocalRecoveryConfig()), this.changelogStoragesManager, taskDeploymentDescriptor.getTaskRestore(), checkpointResponder), taskManagerActions, rpcInputSplitProvider, checkpointResponder, rpcTaskOperatorEventGateway, globalAggregateManager, classLoaderHandle, this.fileCache, this.taskManagerConfiguration, addTask, partitionStateChecker, getRpcService().getScheduledExecutor(), this.channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
                            task.getClass();
                            addTask.gauge(MetricNames.IS_BACK_PRESSURED, task::isBackPressured);
                            this.log.info("Received task {} ({}), deploy into slot with allocation id {}.", new Object[]{task.getTaskInfo().getTaskNameWithSubtasks(), taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor.getAllocationId()});
                            try {
                                if (this.taskSlotTable.addTask(task)) {
                                    task.startTaskThread();
                                    setupResultPartitionBookkeeping(taskDeploymentDescriptor.getJobId(), taskDeploymentDescriptor.getProducedPartitions(), task.getTerminationFuture());
                                    return CompletableFuture.completedFuture(Acknowledge.get());
                                }
                                String str3 = "TaskManager already contains a task for id " + task.getExecutionId() + '.';
                                this.log.debug(str3);
                                throw new TaskSubmissionException(str3);
                            } catch (SlotNotActiveException | SlotNotFoundException e) {
                                throw new TaskSubmissionException("Could not submit task.", e);
                            }
                        } catch (SlotNotFoundException e2) {
                            throw new TaskSubmissionException("Could not submit task.", e2);
                        }
                    } catch (IOException e3) {
                        throw new TaskSubmissionException(e3);
                    }
                } catch (IOException | ClassNotFoundException e4) {
                    throw new TaskSubmissionException("Could not deserialize the job or task information.", e4);
                }
            } catch (IOException | ClassNotFoundException e5) {
                throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e5);
            }
        } catch (TaskSubmissionException e6) {
            return FutureUtils.completedExceptionally(e6);
        }
    }

    private void setupResultPartitionBookkeeping(JobID jobID, Collection<ResultPartitionDeploymentDescriptor> collection, CompletableFuture<ExecutionState> completableFuture) {
        Set set = (Set) filterPartitionsRequiringRelease(collection).peek(resultPartitionDeploymentDescriptor -> {
            this.partitionTracker.startTrackingPartition(jobID, TaskExecutorPartitionInfo.from(resultPartitionDeploymentDescriptor));
        }).map((v0) -> {
            return v0.getShuffleDescriptor();
        }).map((v0) -> {
            return v0.getResultPartitionID();
        }).collect(Collectors.toSet());
        CompletableFuture<U> thenApplyAsync = completableFuture.thenApplyAsync(executionState -> {
            if (executionState != ExecutionState.FINISHED) {
                this.partitionTracker.stopTrackingPartitions(set);
            }
            return executionState;
        }, (Executor) getMainThreadExecutor());
        this.taskResultPartitionCleanupFuturesPerJob.compute(jobID, (jobID2, collection2) -> {
            if (collection2 == null) {
                collection2 = new ArrayList(4);
            }
            collection2.add(thenApplyAsync);
            return collection2;
        });
    }

    private Stream<ResultPartitionDeploymentDescriptor> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> collection) {
        return collection.stream().filter(resultPartitionDeploymentDescriptor -> {
            return resultPartitionDeploymentDescriptor.getPartitionType().isReleaseByScheduler();
        }).filter(resultPartitionDeploymentDescriptor2 -> {
            return resultPartitionDeploymentDescriptor2.getShuffleDescriptor().storesLocalResourcesOn().isPresent();
        });
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time time) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            String str = "Cannot find task to stop for execution " + executionAttemptID + '.';
            this.log.debug(str);
            return FutureUtils.completedExceptionally(new TaskException(str));
        }
        try {
            task.cancelExecution();
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable th) {
            return FutureUtils.completedExceptionally(new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', th));
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> iterable, Time time) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        for (PartitionInfo partitionInfo : iterable) {
            FutureUtils.assertNoException(CompletableFuture.runAsync(() -> {
                try {
                    if (!this.shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
                        this.log.debug("Discard update for input gate partition {} of result {} in task {}. The partition is no longer available.", new Object[]{partitionInfo.getShuffleDescriptor().getResultPartitionID(), partitionInfo.getIntermediateDataSetID(), executionAttemptID});
                    }
                } catch (IOException | InterruptedException e) {
                    this.log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
                    task.failExternally(e);
                }
            }, getRpcService().getScheduledExecutor()));
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void releasePartitions(JobID jobID, Set<ResultPartitionID> set) {
        try {
            this.partitionTracker.stopTrackingAndReleaseJobPartitions(set);
            closeJobManagerConnectionIfNoAllocatedResources(jobID);
        } catch (Throwable th) {
            onFatalError(th);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> promotePartitions(JobID jobID, Set<ResultPartitionID> set) {
        CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
        try {
            this.partitionTracker.promoteJobPartitions(set);
            if (this.establishedResourceManagerConnection != null) {
                this.establishedResourceManagerConnection.getResourceManagerGateway().reportClusterPartitions(getResourceID(), this.partitionTracker.createClusterPartitionReport()).thenAccept(r4 -> {
                    completableFuture.complete(Acknowledge.get());
                });
            } else {
                completableFuture.completeExceptionally(new RuntimeException("Task executor is not connecting to ResourceManager. Fail to report cluster partition to ResourceManager"));
            }
            closeJobManagerConnectionIfNoAllocatedResources(jobID);
        } catch (Throwable th) {
            completableFuture.completeExceptionally(th);
            onFatalError(th);
        }
        return completableFuture;
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> releaseClusterPartitions(Collection<IntermediateDataSetID> collection, Time time) {
        this.partitionTracker.stopTrackingAndReleaseClusterPartitions(collection);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Void> heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
        return this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Void> heartbeatFromResourceManager(ResourceID resourceID) {
        return this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, CheckpointOptions checkpointOptions) {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(j, j2, checkpointOptions);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String str = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        return FutureUtils.completedExceptionally(new CheckpointException(str, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, long j3) {
        this.log.debug("Confirm completed checkpoint {}@{} and last subsumed checkpoint {} for {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(j);
            task.notifyCheckpointSubsumed(j3);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String str = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        return FutureUtils.completedExceptionally(new CheckpointException(str, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, long j3) {
        this.log.debug("Abort checkpoint {}@{} for {}.", new Object[]{Long.valueOf(j), Long.valueOf(j3), executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointAborted(j, j2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String str = "TaskManager received an aborted checkpoint for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        return FutureUtils.completedExceptionally(new CheckpointException(str, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> requestSlot(SlotID slotID, JobID jobID, AllocationID allocationID, ResourceProfile resourceProfile, String str, ResourceManagerId resourceManagerId, Time time) {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationID, jobID, resourceManagerId});
        if (!isConnectedToResourceManager(resourceManagerId)) {
            String format = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
            this.log.debug(format);
            return FutureUtils.completedExceptionally(new TaskManagerException(format));
        }
        tryPersistAllocationSnapshot(new SlotAllocationSnapshot(slotID, jobID, str, allocationID, resourceProfile));
        try {
            if (allocateSlotForJob(jobID, slotID, allocationID, resourceProfile, str)) {
                offerSlotsToJobManager(jobID);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (SlotAllocationException e) {
            this.log.debug("Could not allocate slot for allocation id {}.", allocationID, e);
            return FutureUtils.completedExceptionally(e);
        }
    }

    private boolean allocateSlotForJob(JobID jobID, SlotID slotID, AllocationID allocationID, ResourceProfile resourceProfile, String str) throws SlotAllocationException {
        allocateSlot(slotID, jobID, allocationID, resourceProfile);
        try {
            return this.jobTable.getOrCreateJob(jobID, () -> {
                return registerNewJobAndCreateServices(jobID, str);
            }).isConnected();
        } catch (Exception e) {
            try {
                this.taskSlotTable.freeSlot(allocationID);
            } catch (SlotNotFoundException e2) {
                onFatalError(e2);
            }
            this.localStateStoresManager.releaseLocalStateForAllocationId(allocationID);
            if (!this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
                onFatalError(new Exception("Could not free slot " + slotID));
            }
            throw new SlotAllocationException("Could not create new job.", e);
        }
    }

    private TaskExecutorJobServices registerNewJobAndCreateServices(JobID jobID, String str) throws Exception {
        this.jobLeaderService.addJob(jobID, str);
        JobPermanentBlobService permanentBlobService = this.taskExecutorBlobService.getPermanentBlobService();
        permanentBlobService.registerJob(jobID);
        return TaskExecutorJobServices.create(this.libraryCacheManager.registerClassLoaderLease(jobID), () -> {
            permanentBlobService.releaseJob(jobID);
        });
    }

    private void allocateSlot(SlotID slotID, JobID jobID, AllocationID allocationID, ResourceProfile resourceProfile) throws SlotAllocationException {
        if (this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
            if (this.taskSlotTable.allocateSlot(slotID.getSlotNumber(), jobID, allocationID, resourceProfile, this.taskManagerConfiguration.getSlotTimeout())) {
                return;
            }
            this.log.info("Could not allocate slot for {}.", allocationID);
            throw new SlotAllocationException("Could not allocate slot.");
        }
        if (this.taskSlotTable.isAllocated(slotID.getSlotNumber(), jobID, allocationID)) {
            return;
        }
        String str = "The slot " + slotID + " has already been allocated for a different job.";
        this.log.info(str);
        AllocationID currentAllocation = this.taskSlotTable.getCurrentAllocation(slotID.getSlotNumber());
        throw new SlotOccupiedException(str, currentAllocation, this.taskSlotTable.getOwningJob(currentAllocation));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationID, Throwable th, Time time) {
        freeSlotInternal(allocationID, th);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void freeInactiveSlots(JobID jobID, Time time) {
        this.log.debug("Freeing inactive slots for job {}.", jobID);
        UnmodifiableIterator it = ImmutableList.copyOf(this.taskSlotTable.getAllocatedSlots(jobID)).iterator();
        while (it.hasNext()) {
            freeSlotInternal(((TaskSlot) it.next()).getAllocationId(), new FlinkException("Slot was re-claimed by resource manager."));
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<TransientBlobKey> requestFileUploadByType(FileType fileType, Time time) {
        String str;
        switch (fileType) {
            case LOG:
                str = this.taskManagerConfiguration.getTaskManagerLogPath();
                break;
            case STDOUT:
                str = this.taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            default:
                str = null;
                break;
        }
        return requestFileUploadByFilePath(str, fileType.toString());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<TransientBlobKey> requestFileUploadByName(String str, Time time) {
        String taskManagerLogDir = this.taskManagerConfiguration.getTaskManagerLogDir();
        return requestFileUploadByFilePath((StringUtils.isNullOrWhitespaceOnly(taskManagerLogDir) || StringUtils.isNullOrWhitespaceOnly(str)) ? null : new File(taskManagerLogDir, new File(str).getName()).getPath(), str);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time time) {
        return CompletableFuture.completedFuture(SerializableOptional.ofNullable(this.metricQueryServiceAddress));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void disconnectJobManager(JobID jobID, Exception exc) {
        this.jobTable.getConnection(jobID).ifPresent(connection -> {
            disconnectAndTryReconnectToJobManager(connection, exc);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnectAndTryReconnectToJobManager(JobTable.Connection connection, Exception exc) {
        disconnectJobManagerConnection(connection, exc);
        this.jobLeaderService.reconnect(connection.getJobId());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void disconnectResourceManager(Exception exc) {
        if (isRunning()) {
            reconnectToResourceManager(exc);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway, org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway
    public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
        this.log.debug("Operator event for {} - {}", executionAttemptID, operatorID);
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            return FutureUtils.completedExceptionally(new TaskNotRunningException("Task " + executionAttemptID + " not running on TaskManager"));
        }
        try {
            task.deliverOperatorEvent(operatorID, serializedValue);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalError(th);
            return FutureUtils.completedExceptionally(th);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time time) {
        return CompletableFuture.completedFuture(ThreadDumpInfo.dumpAndCreate(((Integer) this.taskManagerConfiguration.getConfiguration().get(ClusterOptions.THREAD_DUMP_STACKTRACE_MAX_DEPTH)).intValue()));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> updateDelegationTokens(ResourceManagerId resourceManagerId, byte[] bArr) {
        this.log.info("Receive update delegation tokens from resource manager with leader id {}.", resourceManagerId);
        if (!isConnectedToResourceManager(resourceManagerId)) {
            String format = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
            this.log.debug(format);
            return FutureUtils.completedExceptionally(new TaskManagerException(format));
        }
        try {
            this.delegationTokenReceiverRepository.onNewTokensObtained(bArr);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable th) {
            this.log.error("Could not update delegation tokens.", th);
            ExceptionUtils.rethrowIfFatalError(th);
            return FutureUtils.completedExceptionally(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOfNewResourceManagerLeader(String str, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = createResourceManagerAddress(str, resourceManagerId);
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress)));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String str, @Nullable ResourceManagerId resourceManagerId) {
        if (str == null) {
            return null;
        }
        if ($assertionsDisabled || resourceManagerId != null) {
            return new ResourceManagerAddress(str, resourceManagerId);
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnectToResourceManager(Exception exc) {
        closeResourceManagerConnection(exc);
        startRegistrationTimeout();
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        if (!$assertionsDisabled && this.resourceManagerAddress == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.establishedResourceManagerConnection != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.resourceManagerConnection != null) {
            throw new AssertionError();
        }
        this.log.info("Connecting to ResourceManager {}.", this.resourceManagerAddress);
        this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, getRpcService(), this.taskManagerConfiguration.getRetryingRegistrationConfiguration(), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), getMainThreadExecutor(), new ResourceManagerRegistrationListener(), new TaskExecutorRegistration(getAddress(), getResourceID(), this.unresolvedTaskManagerLocation.getDataPort(), ((Integer) JMXService.getPort().orElse(-1)).intValue(), this.hardwareDescription, this.memoryConfiguration, this.taskManagerConfiguration.getDefaultSlotResourceProfile(), this.taskManagerConfiguration.getTotalResourceProfile(), this.unresolvedTaskManagerLocation.getNodeId()));
        this.resourceManagerConnection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishResourceManagerConnection(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, InstanceID instanceID, ClusterInformation clusterInformation) {
        resourceManagerGateway.sendSlotReport(getResourceID(), instanceID, this.taskSlotTable.createSlotReport(getResourceID()), this.taskManagerConfiguration.getRpcTimeout()).whenCompleteAsync((acknowledge, th) -> {
            if (th != null) {
                reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", th));
            }
        }, (Executor) getMainThreadExecutor());
        this.resourceManagerHeartbeatManager.monitorTarget(resourceID, new ResourceManagerHeartbeatReceiver(resourceManagerGateway));
        this.taskExecutorBlobService.setBlobServerAddress(new InetSocketAddress(clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort()));
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceID, instanceID);
        stopRegistrationTimeout();
    }

    private void closeResourceManagerConnection(Exception exc) {
        if (this.establishedResourceManagerConnection != null) {
            ResourceID resourceManagerResourceId = this.establishedResourceManagerConnection.getResourceManagerResourceId();
            this.log.info("Close ResourceManager connection {}.", resourceManagerResourceId, ExceptionUtils.returnExceptionIfUnexpected(exc.getCause()));
            ExceptionUtils.logExceptionIfExcepted(exc.getCause(), this.log);
            this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
            this.establishedResourceManagerConnection.getResourceManagerGateway().disconnectTaskManager(getResourceID(), exc);
            this.establishedResourceManagerConnection = null;
            this.partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
        }
        if (this.resourceManagerConnection != null) {
            if (!this.resourceManagerConnection.isConnected()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Terminating registration attempts towards ResourceManager {}.", this.resourceManagerConnection.getTargetAddress(), exc);
                } else {
                    this.log.info("Terminating registration attempts towards ResourceManager {}.", this.resourceManagerConnection.getTargetAddress());
                }
            }
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void startRegistrationTimeout() {
        Duration maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
        if (maxRegistrationDuration != null) {
            UUID randomUUID = UUID.randomUUID();
            this.currentRegistrationTimeoutId = randomUUID;
            scheduleRunAsync(() -> {
                registrationTimeout(randomUUID);
            }, maxRegistrationDuration);
        }
    }

    private void stopRegistrationTimeout() {
        this.currentRegistrationTimeoutId = null;
    }

    private void registrationTimeout(@Nonnull UUID uuid) {
        if (uuid.equals(this.currentRegistrationTimeoutId)) {
            onFatalError(new RegistrationTimeoutException(String.format("Could not register at the ResourceManager within the specified maximum registration duration %s. This indicates a problem with this instance. Terminating now.", this.taskManagerConfiguration.getMaxRegistrationDuration())));
        }
    }

    private void offerSlotsToJobManager(JobID jobID) {
        this.jobTable.getConnection(jobID).ifPresent(this::internalOfferSlotsToJobManager);
    }

    private void internalOfferSlotsToJobManager(JobTable.Connection connection) {
        JobID jobId = connection.getJobId();
        if (!this.taskSlotTable.hasAllocatedSlots(jobId)) {
            this.log.debug("There are no unassigned slots for the job {}.", jobId);
            return;
        }
        this.log.info("Offer reserved slots to the leader of job {}.", jobId);
        JobMasterGateway jobManagerGateway = connection.getJobManagerGateway();
        Iterator<TaskSlot<Task>> allocatedSlots = this.taskSlotTable.getAllocatedSlots(jobId);
        JobMasterId jobMasterId = connection.getJobMasterId();
        HashSet newHashSetWithExpectedSize = CollectionUtil.newHashSetWithExpectedSize(2);
        while (allocatedSlots.hasNext()) {
            newHashSetWithExpectedSize.add(allocatedSlots.next().generateSlotOffer());
        }
        UUID randomUUID = UUID.randomUUID();
        this.currentSlotOfferPerJob.put(jobId, randomUUID);
        jobManagerGateway.offerSlots(getResourceID(), newHashSetWithExpectedSize, this.taskManagerConfiguration.getRpcTimeout()).whenCompleteAsync((BiConsumer<? super Collection<SlotOffer>, ? super Throwable>) handleAcceptedSlotOffers(jobId, jobManagerGateway, jobMasterId, newHashSetWithExpectedSize, randomUUID), (Executor) getMainThreadExecutor());
    }

    @Nonnull
    private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobID, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> collection, UUID uuid) {
        return (iterable, th) -> {
            if (!uuid.equals(this.currentSlotOfferPerJob.get(jobID))) {
                this.log.debug("Discard slot offer response since there is a newer offer for the job {}.", jobID);
                return;
            }
            if (th != null) {
                if (th instanceof TimeoutException) {
                    this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                    offerSlotsToJobManager(jobID);
                    return;
                } else {
                    this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", th);
                    Iterator it = collection.iterator();
                    while (it.hasNext()) {
                        freeSlotInternal(((SlotOffer) it.next()).getAllocationId(), th);
                    }
                    return;
                }
            }
            if (!isJobManagerConnectionValid(jobID, jobMasterId)) {
                this.log.debug("Discard slot offer response since there is a new leader for the job {}.", jobID);
                return;
            }
            Iterator it2 = iterable.iterator();
            while (it2.hasNext()) {
                SlotOffer slotOffer = (SlotOffer) it2.next();
                AllocationID allocationId = slotOffer.getAllocationId();
                try {
                    if (!this.taskSlotTable.markSlotActive(allocationId)) {
                        String str = "Could not mark slot " + allocationId + " active.";
                        this.log.debug(str);
                        jobMasterGateway.failSlot(getResourceID(), allocationId, new FlinkException(str));
                    }
                } catch (SlotNotFoundException e) {
                    jobMasterGateway.failSlot(getResourceID(), allocationId, new FlinkException("Could not mark slot " + allocationId + " active."));
                }
                collection.remove(slotOffer);
            }
            Exception exc = new Exception("The slot was rejected by the JobManager.");
            Iterator it3 = collection.iterator();
            while (it3.hasNext()) {
                freeSlotInternal(((SlotOffer) it3.next()).getAllocationId(), exc);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishJobManagerConnection(JobTable.Job job, JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
        JobID jobId = job.getJobId();
        Optional<JobTable.Connection> asConnection = job.asConnection();
        if (asConnection.isPresent()) {
            JobTable.Connection connection = asConnection.get();
            if (Objects.equals(connection.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
                return;
            }
            disconnectJobManagerConnection(connection, new Exception("Found new job leader for job id " + jobId + '.'));
        }
        this.log.info("Establish JobManager connection for job {}.", jobId);
        ResourceID resourceID = jMTMRegistrationSuccess.getResourceID();
        JobTable.Connection associateWithJobManager = associateWithJobManager(job, resourceID, jobMasterGateway);
        this.jobManagerHeartbeatManager.monitorTarget(resourceID, new JobManagerHeartbeatReceiver(jobMasterGateway));
        internalOfferSlotsToJobManager(associateWithJobManager);
    }

    private void closeJob(JobTable.Job job, Exception exc) {
        job.asConnection().ifPresent(connection -> {
            disconnectJobManagerConnection(connection, exc);
        });
        job.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnectJobManagerConnection(JobTable.Connection connection, Exception exc) {
        JobID jobId = connection.getJobId();
        this.log.info("Close JobManager connection for job {}.", jobId, ExceptionUtils.returnExceptionIfUnexpected(exc.getCause()));
        ExceptionUtils.logExceptionIfExcepted(exc.getCause(), this.log);
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobId);
        Throwable flinkException = new FlinkException(String.format("Disconnect from JobManager responsible for %s.", jobId), exc);
        while (tasks.hasNext()) {
            tasks.next().failExternally(flinkException);
        }
        Set<AllocationID> activeTaskSlotAllocationIdsPerJob = this.taskSlotTable.getActiveTaskSlotAllocationIdsPerJob(jobId);
        FlinkException flinkException2 = new FlinkException("Slot could not be marked inactive.");
        for (AllocationID allocationID : activeTaskSlotAllocationIdsPerJob) {
            try {
                if (!this.taskSlotTable.markSlotInactive(allocationID, this.taskManagerConfiguration.getSlotTimeout())) {
                    freeSlotInternal(allocationID, flinkException2);
                }
            } catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", allocationID, e);
            }
        }
        try {
            this.jobManagerHeartbeatManager.unmonitorTarget(connection.getResourceId());
            disassociateFromJobManager(connection, exc);
        } catch (IOException e2) {
            this.log.warn("Could not properly disassociate from JobManager {}.", connection.getJobManagerGateway().getAddress(), e2);
        }
        connection.disconnect();
    }

    private JobTable.Connection associateWithJobManager(JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) {
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(jobMasterGateway);
        TaskManagerActionsImpl taskManagerActionsImpl = new TaskManagerActionsImpl(jobMasterGateway);
        RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        RpcGlobalAggregateManager rpcGlobalAggregateManager = new RpcGlobalAggregateManager(jobMasterGateway);
        RpcPartitionStateChecker rpcPartitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
        registerQueryableState(job.getJobId(), jobMasterGateway);
        return job.connect(resourceID, jobMasterGateway, taskManagerActionsImpl, rpcCheckpointResponder, rpcGlobalAggregateManager, rpcPartitionStateChecker);
    }

    private void disassociateFromJobManager(JobTable.Connection connection, Exception exc) throws IOException {
        Preconditions.checkNotNull(connection);
        JobID jobId = connection.getJobId();
        scheduleResultPartitionCleanup(jobId);
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateRegistry != null) {
            kvStateRegistry.unregisterListener(jobId);
        }
        KvStateClientProxy kvStateClientProxy = this.kvStateService.getKvStateClientProxy();
        if (kvStateClientProxy != null) {
            kvStateClientProxy.updateKvStateLocationOracle(connection.getJobId(), null);
        }
        connection.getJobManagerGateway().disconnectTaskManager(getResourceID(), exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRejectedJobManagerConnection(JobID jobID, String str, JMTMRegistrationRejection jMTMRegistrationRejection) {
        this.log.info("The JobManager under {} rejected the registration for job {}: {}. Releasing all job related resources.", new Object[]{str, jobID, jMTMRegistrationRejection.getReason()});
        releaseJobResources(jobID, new FlinkException(String.format("JobManager %s has rejected the registration.", jobID)));
    }

    private void releaseJobResources(JobID jobID, Exception exc) {
        this.log.debug("Releasing job resources for job {}.", jobID, exc);
        if (this.partitionTracker.isTrackingPartitionsFor(jobID)) {
            this.partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobID);
        }
        Set<AllocationID> allocationIdsPerJob = this.taskSlotTable.getAllocationIdsPerJob(jobID);
        if (!allocationIdsPerJob.isEmpty()) {
            Iterator<AllocationID> it = allocationIdsPerJob.iterator();
            while (it.hasNext()) {
                freeSlotInternal(it.next(), exc);
            }
        }
        this.jobLeaderService.removeJob(jobID);
        this.jobTable.getJob(jobID).ifPresent(job -> {
            closeJob(job, exc);
        });
        this.taskManagerMetricGroup.removeJobMetricsGroup(jobID);
        this.changelogStoragesManager.releaseResourcesForJob(jobID);
        this.currentSlotOfferPerJob.remove(jobID);
        this.channelStateExecutorFactoryManager.releaseResourcesForJob(jobID);
        this.shuffleDescriptorsCache.clearCacheForJob(jobID);
    }

    private void scheduleResultPartitionCleanup(JobID jobID) {
        Collection<CompletableFuture<ExecutionState>> remove = this.taskResultPartitionCleanupFuturesPerJob.remove(jobID);
        if (remove != null) {
            FutureUtils.waitForAll(remove).thenRunAsync(() -> {
                this.partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobID);
            }, getMainThreadExecutor());
        }
    }

    private void registerQueryableState(JobID jobID, JobMasterGateway jobMasterGateway) {
        KvStateServer kvStateServer = this.kvStateService.getKvStateServer();
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateServer != null && kvStateRegistry != null) {
            kvStateRegistry.registerListener(jobID, new RpcKvStateRegistryListener(jobMasterGateway, kvStateServer.getServerAddress()));
        }
        KvStateClientProxy kvStateClientProxy = this.kvStateService.getKvStateClientProxy();
        if (kvStateClientProxy != null) {
            kvStateClientProxy.updateKvStateLocationOracle(jobID, jobMasterGateway);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failTask(ExecutionAttemptID executionAttemptID, Throwable th) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.info("Cannot find task to fail for execution {} with exception:", executionAttemptID, th);
            return;
        }
        try {
            task.failExternally(th);
        } catch (Throwable th2) {
            this.log.error("Could not fail task {}.", executionAttemptID, th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTaskExecutionState(JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        ExecutionAttemptID id = taskExecutionState.getID();
        jobMasterGateway.updateTaskExecutionState(taskExecutionState).whenCompleteAsync((acknowledge, th) -> {
            if (th != null) {
                failTask(id, th);
            }
        }, (Executor) getMainThreadExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterTaskAndNotifyFinalState(JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task removeTask = this.taskSlotTable.removeTask(executionAttemptID);
        if (removeTask == null) {
            this.log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
            return;
        }
        if (!removeTask.getExecutionState().isTerminal()) {
            try {
                removeTask.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
            } catch (Exception e) {
                this.log.error("Could not properly fail task.", e);
            }
        }
        this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{removeTask.getExecutionState(), removeTask.getTaskInfo().getTaskNameWithSubtasks(), removeTask.getExecutionId()});
        updateTaskExecutionState(jobMasterGateway, new TaskExecutionState(removeTask.getExecutionId(), removeTask.getExecutionState(), removeTask.getFailureCause(), removeTask.getAccumulatorRegistry().getSnapshot(), removeTask.getMetricGroup().getIOMetricGroup().createSnapshot()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void freeSlotInternal(AllocationID allocationID, Throwable th) {
        Preconditions.checkNotNull(allocationID);
        if (!isRunning()) {
            this.log.debug("Ignoring the freeing of slot {} because the TaskExecutor is shutting down.", allocationID);
            return;
        }
        this.log.debug("Free slot with allocation id {} because: {}", allocationID, th.getMessage());
        try {
            JobID owningJob = this.taskSlotTable.getOwningJob(allocationID);
            int freeSlot = this.taskSlotTable.freeSlot(allocationID, th);
            this.slotAllocationSnapshotPersistenceService.deleteAllocationSnapshot(freeSlot);
            if (freeSlot != -1) {
                if (isConnectedToResourceManager()) {
                    this.establishedResourceManagerConnection.getResourceManagerGateway().notifySlotAvailable(this.establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(getResourceID(), freeSlot), allocationID);
                }
                if (owningJob != null) {
                    closeJobManagerConnectionIfNoAllocatedResources(owningJob);
                }
            }
        } catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", allocationID, e);
        }
        this.localStateStoresManager.releaseLocalStateForAllocationId(allocationID);
    }

    private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobID) {
        if (!this.taskSlotTable.getAllocationIdsPerJob(jobID).isEmpty() || this.partitionTracker.isTrackingPartitionsFor(jobID)) {
            return;
        }
        releaseJobResources(jobID, new FlinkExpectedException("TaskExecutor " + getAddress() + " has no more allocated slots for job " + jobID + '.'));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void timeoutSlot(AllocationID allocationID, UUID uuid) {
        Preconditions.checkNotNull(allocationID);
        Preconditions.checkNotNull(uuid);
        if (this.taskSlotTable.isValidTimeout(allocationID, uuid)) {
            freeSlotInternal(allocationID, new Exception("The slot " + allocationID + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationID, uuid);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncSlotsWithSnapshotFromJobMaster(JobMasterGateway jobMasterGateway, AllocatedSlotReport allocatedSlotReport) {
        failNoLongerAllocatedSlots(allocatedSlotReport, jobMasterGateway);
        freeNoLongerUsedSlots(allocatedSlotReport);
    }

    private void failNoLongerAllocatedSlots(AllocatedSlotReport allocatedSlotReport, JobMasterGateway jobMasterGateway) {
        for (AllocatedSlotInfo allocatedSlotInfo : allocatedSlotReport.getAllocatedSlotInfos()) {
            AllocationID allocationId = allocatedSlotInfo.getAllocationId();
            if (!this.taskSlotTable.isAllocated(allocatedSlotInfo.getSlotIndex(), allocatedSlotReport.getJobId(), allocationId)) {
                jobMasterGateway.failSlot(getResourceID(), allocationId, new FlinkException(String.format("Slot %s on TaskExecutor %s is not allocated by job %s.", Integer.valueOf(allocatedSlotInfo.getSlotIndex()), getResourceID().getStringWithMetadata(), allocatedSlotReport.getJobId())));
            }
        }
    }

    private void freeNoLongerUsedSlots(AllocatedSlotReport allocatedSlotReport) {
        UnmodifiableIterator it = Sets.difference(this.taskSlotTable.getActiveTaskSlotAllocationIdsPerJob(allocatedSlotReport.getJobId()), (Set) allocatedSlotReport.getAllocatedSlotInfos().stream().map((v0) -> {
            return v0.getAllocationId();
        }).collect(Collectors.toSet())).iterator();
        while (it.hasNext()) {
            AllocationID allocationID = (AllocationID) it.next();
            freeSlotInternal(allocationID, new FlinkException(String.format("%s is no longer allocated by job %s.", allocationID, allocatedSlotReport.getJobId())));
        }
    }

    private void tryPersistAllocationSnapshot(SlotAllocationSnapshot slotAllocationSnapshot) {
        try {
            this.slotAllocationSnapshotPersistenceService.persistAllocationSnapshot(slotAllocationSnapshot);
        } catch (IOException e) {
            this.log.debug("Cannot persist the slot allocation snapshot {}.", slotAllocationSnapshot, e);
        }
    }

    private void tryLoadLocalAllocationSnapshots() {
        Collection<SlotAllocationSnapshot> loadAllocationSnapshots = this.slotAllocationSnapshotPersistenceService.loadAllocationSnapshots();
        this.log.debug("Recovered slot allocation snapshots {}.", loadAllocationSnapshots);
        HashSet hashSet = new HashSet();
        for (SlotAllocationSnapshot slotAllocationSnapshot : loadAllocationSnapshots) {
            try {
                allocateSlotForJob(slotAllocationSnapshot.getJobId(), slotAllocationSnapshot.getSlotID(), slotAllocationSnapshot.getAllocationId(), slotAllocationSnapshot.getResourceProfile(), slotAllocationSnapshot.getJobTargetAddress());
            } catch (SlotAllocationException e) {
                this.log.debug("Cannot reallocate restored slot {}.", slotAllocationSnapshot, e);
            }
            hashSet.add(slotAllocationSnapshot.getAllocationId());
        }
        this.localStateStoresManager.retainLocalStateForAllocations(hashSet);
    }

    private boolean isConnectedToResourceManager() {
        return this.establishedResourceManagerConnection != null;
    }

    private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
        return (this.establishedResourceManagerConnection == null || this.resourceManagerAddress == null || !this.resourceManagerAddress.getResourceManagerId().equals(resourceManagerId)) ? false : true;
    }

    private boolean isJobManagerConnectionValid(JobID jobID, JobMasterId jobMasterId) {
        return ((Boolean) this.jobTable.getConnection(jobID).map(connection -> {
            return Boolean.valueOf(Objects.equals(connection.getJobMasterId(), jobMasterId));
        }).orElse(false)).booleanValue();
    }

    private CompletableFuture<TransientBlobKey> requestFileUploadByFilePath(String str, String str2) {
        this.log.debug("Received file upload request for file {}", str2);
        if (!StringUtils.isNullOrWhitespaceOnly(str)) {
            return CompletableFuture.supplyAsync(() -> {
                File file = new File(str);
                if (!file.exists()) {
                    this.log.debug("The file {} does not exist on the TaskExecutor {}.", str2, getResourceID().getStringWithMetadata());
                    throw new CompletionException(new FileNotFoundException("The file " + str2 + " does not exist on the TaskExecutor."));
                }
                try {
                    return putTransientBlobStream(new FileInputStream(file), str2).get();
                } catch (Exception e) {
                    this.log.debug("Could not upload file {}.", str2, e);
                    throw new CompletionException((Throwable) new FlinkException("Could not upload file " + str2 + '.', e));
                }
            }, this.ioExecutor);
        }
        this.log.debug("The file {} is unavailable on the TaskExecutor {}.", str2, getResourceID().getStringWithMetadata());
        return FutureUtils.completedExceptionally(new FlinkException("The file " + str2 + " is not available on the TaskExecutor."));
    }

    private CompletableFuture<TransientBlobKey> putTransientBlobStream(InputStream inputStream, String str) {
        try {
            return CompletableFuture.completedFuture(this.taskExecutorBlobService.getTransientBlobService().putTransient(inputStream));
        } catch (IOException e) {
            this.log.debug("Could not upload file {}.", str, e);
            return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + str + '.', e));
        }
    }

    public ResourceID getResourceID() {
        return this.unresolvedTaskManagerLocation.getResourceID();
    }

    void onFatalError(Throwable th) {
        try {
            this.log.error("Fatal error occurred in TaskExecutor {}.", getAddress(), th);
        } catch (Throwable th2) {
        }
        this.fatalErrorHandler.onFatalError(th);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    @VisibleForTesting
    HeartbeatManager<Void, TaskExecutorHeartbeatPayload> getResourceManagerHeartbeatManager() {
        return this.resourceManagerHeartbeatManager;
    }

    static {
        $assertionsDisabled = !TaskExecutor.class.desiredAssertionStatus();
    }
}
