package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.stats.VertexStatsTracker;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker.class */
public class VertexThreadInfoTracker implements VertexStatsTracker<VertexThreadInfoStats> {
    private static final Logger LOG;

    @GuardedBy("lock")
    private final ThreadInfoRequestCoordinator coordinator;
    private final ExecutorService executor;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;

    @GuardedBy("lock")
    private final Cache<JobVertexKey, VertexThreadInfoStats> jobVertexStatsCache;

    @GuardedBy("lock")
    private final Cache<ExecutionVertexKey, VertexThreadInfoStats> executionVertexStatsCache;
    private final int numSamples;
    private final Duration statsRefreshInterval;
    private final Duration delayBetweenSamples;
    private final int maxThreadInfoDepth;
    private boolean shutDown;
    private final Time rpcTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Set<JobVertexKey> pendingJobVertexStats = new HashSet();

    @GuardedBy("lock")
    private final Set<ExecutionVertexKey> pendingExecutionVertexStats = new HashSet();
    private final CompletableFuture<Void> resultAvailableFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker$ExecutionVertexKey.class */
    public static class ExecutionVertexKey {
        private final JobVertexKey jobVertexKey;
        private final int subtaskIndex;

        private ExecutionVertexKey(JobID jobID, JobVertexID jobVertexID, int i) {
            this.jobVertexKey = new JobVertexKey(jobID, jobVertexID);
            this.subtaskIndex = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public JobVertexKey getJobVertexKey() {
            return this.jobVertexKey;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ExecutionVertexKey executionVertexKey = (ExecutionVertexKey) obj;
            return Objects.equals(this.jobVertexKey, executionVertexKey.jobVertexKey) && Objects.equals(Integer.valueOf(this.subtaskIndex), Integer.valueOf(executionVertexKey.subtaskIndex));
        }

        public int hashCode() {
            return Objects.hash(this.jobVertexKey, Integer.valueOf(this.subtaskIndex));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker$ExecutionVertexThreadInfoSampleCompletionCallback.class */
    public class ExecutionVertexThreadInfoSampleCompletionCallback extends ThreadInfoSampleCompletionCallback {
        private final ExecutionVertexKey executionVertexKey;

        ExecutionVertexThreadInfoSampleCompletionCallback(ExecutionVertexKey executionVertexKey, String str) {
            super(str);
            this.executionVertexKey = executionVertexKey;
        }

        @Override // org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker.ThreadInfoSampleCompletionCallback
        protected void handleResult(VertexThreadInfoStats vertexThreadInfoStats) {
            VertexThreadInfoTracker.this.executionVertexStatsCache.put(this.executionVertexKey, vertexThreadInfoStats);
        }

        @Override // org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker.ThreadInfoSampleCompletionCallback
        protected void doFinally() {
            VertexThreadInfoTracker.this.pendingExecutionVertexStats.remove(this.executionVertexKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker$JobVertexKey.class */
    public static class JobVertexKey {
        private final JobID jobId;
        private final JobVertexID jobVertexId;

        private JobVertexKey(JobID jobID, JobVertexID jobVertexID) {
            this.jobId = jobID;
            this.jobVertexId = jobVertexID;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ExecutionVertexKey toExecutionVertexKey(int i) {
            return new ExecutionVertexKey(this.jobId, this.jobVertexId, i);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            JobVertexKey jobVertexKey = (JobVertexKey) obj;
            return Objects.equals(this.jobId, jobVertexKey.jobId) && Objects.equals(this.jobVertexId, jobVertexKey.jobVertexId);
        }

        public int hashCode() {
            return Objects.hash(this.jobId, this.jobVertexId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker$JobVertexThreadInfoSampleCompletionCallback.class */
    public class JobVertexThreadInfoSampleCompletionCallback extends ThreadInfoSampleCompletionCallback {
        private final JobVertexKey jobVertexKey;

        JobVertexThreadInfoSampleCompletionCallback(JobVertexKey jobVertexKey, String str) {
            super(str);
            this.jobVertexKey = jobVertexKey;
        }

        @Override // org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker.ThreadInfoSampleCompletionCallback
        protected void handleResult(VertexThreadInfoStats vertexThreadInfoStats) {
            VertexThreadInfoTracker.this.jobVertexStatsCache.put(this.jobVertexKey, vertexThreadInfoStats);
            for (Map.Entry<ExecutionAttemptID, Collection<ThreadInfoSample>> entry : vertexThreadInfoStats.getSamplesBySubtask().entrySet()) {
                ExecutionAttemptID key = entry.getKey();
                ExecutionVertexKey executionVertexKey = this.jobVertexKey.toExecutionVertexKey(key.getSubtaskIndex());
                VertexThreadInfoStats vertexThreadInfoStats2 = (VertexThreadInfoStats) VertexThreadInfoTracker.this.executionVertexStatsCache.getIfPresent(executionVertexKey);
                if (vertexThreadInfoStats2 == null || vertexThreadInfoStats2.getRequestId() < vertexThreadInfoStats.getRequestId()) {
                    VertexThreadInfoTracker.this.executionVertexStatsCache.put(executionVertexKey, generateExecutionVertexStats(vertexThreadInfoStats, key, entry.getValue()));
                } else if (vertexThreadInfoStats2.getRequestId() == vertexThreadInfoStats.getRequestId()) {
                    vertexThreadInfoStats2.getSamplesBySubtask().put(key, entry.getValue());
                }
            }
        }

        private VertexThreadInfoStats generateExecutionVertexStats(VertexThreadInfoStats vertexThreadInfoStats, ExecutionAttemptID executionAttemptID, Collection<ThreadInfoSample> collection) {
            HashMap hashMap = new HashMap();
            hashMap.put(executionAttemptID, collection);
            return new VertexThreadInfoStats(vertexThreadInfoStats.getRequestId(), vertexThreadInfoStats.getStartTime(), vertexThreadInfoStats.getEndTime(), hashMap);
        }

        @Override // org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker.ThreadInfoSampleCompletionCallback
        protected void doFinally() {
            VertexThreadInfoTracker.this.pendingJobVertexStats.remove(this.jobVertexKey);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTracker$ThreadInfoSampleCompletionCallback.class */
    public abstract class ThreadInfoSampleCompletionCallback implements BiConsumer<VertexThreadInfoStats, Throwable> {
        private final String sampleName;

        protected ThreadInfoSampleCompletionCallback(String str) {
            this.sampleName = str;
        }

        protected abstract void handleResult(VertexThreadInfoStats vertexThreadInfoStats);

        protected abstract void doFinally();

        @Override // java.util.function.BiConsumer
        public void accept(VertexThreadInfoStats vertexThreadInfoStats, Throwable th) {
            synchronized (VertexThreadInfoTracker.this.lock) {
                try {
                    try {
                    } catch (Throwable th2) {
                        VertexThreadInfoTracker.LOG.error("Error during stats completion.", th2);
                        doFinally();
                    }
                    if (VertexThreadInfoTracker.this.shutDown) {
                        return;
                    }
                    if (vertexThreadInfoStats == null) {
                        VertexThreadInfoTracker.LOG.error("Failed to gather a thread info sample for {}", this.sampleName, th);
                        doFinally();
                    } else {
                        handleResult(vertexThreadInfoStats);
                        VertexThreadInfoTracker.this.resultAvailableFuture.complete(null);
                        doFinally();
                    }
                } finally {
                    doFinally();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VertexThreadInfoTracker(ThreadInfoRequestCoordinator threadInfoRequestCoordinator, GatewayRetriever<ResourceManagerGateway> gatewayRetriever, ScheduledExecutorService scheduledExecutorService, Duration duration, int i, Duration duration2, Duration duration3, int i2, Time time, Cache<JobVertexKey, VertexThreadInfoStats> cache, Cache<ExecutionVertexKey, VertexThreadInfoStats> cache2) {
        this.coordinator = (ThreadInfoRequestCoordinator) Preconditions.checkNotNull(threadInfoRequestCoordinator, "Thread info samples coordinator");
        this.resourceManagerGatewayRetriever = (GatewayRetriever) Preconditions.checkNotNull(gatewayRetriever, "Gateway retriever");
        this.executor = (ExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "Scheduled executor");
        this.statsRefreshInterval = (Duration) Preconditions.checkNotNull(duration2, "Statistics refresh interval");
        this.rpcTimeout = time;
        Preconditions.checkArgument(duration.toMillis() > 0, "Clean up interval must be greater than 0");
        Preconditions.checkArgument(i >= 1, "Number of samples");
        this.numSamples = i;
        Preconditions.checkArgument(duration2.toMillis() > 0, "Stats refresh interval must be greater than 0");
        this.delayBetweenSamples = (Duration) Preconditions.checkNotNull(duration3, "Delay between samples");
        Preconditions.checkArgument(i2 > 0, "Max stack trace depth must be greater than 0");
        this.maxThreadInfoDepth = i2;
        this.jobVertexStatsCache = (Cache) Preconditions.checkNotNull(cache, "Job vertex stats cache");
        this.executionVertexStatsCache = (Cache) Preconditions.checkNotNull(cache2, "Execution vertex stats cache");
        scheduledExecutorService.scheduleWithFixedDelay(this::cleanUpStatsCache, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.runtime.webmonitor.stats.VertexStatsTracker
    public Optional<VertexThreadInfoStats> getJobVertexStats(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex) {
        Optional<VertexThreadInfoStats> ofNullable;
        synchronized (this.lock) {
            JobVertexKey key = getKey(jobID, accessExecutionJobVertex);
            VertexThreadInfoStats ifPresent = this.jobVertexStatsCache.getIfPresent(key);
            if (ifPresent == null || System.currentTimeMillis() >= ifPresent.getEndTime() + this.statsRefreshInterval.toMillis()) {
                triggerThreadInfoSampleInternal(key, accessExecutionJobVertex);
            }
            ofNullable = Optional.ofNullable(ifPresent);
        }
        return ofNullable;
    }

    @Override // org.apache.flink.runtime.webmonitor.stats.VertexStatsTracker
    public Optional<VertexThreadInfoStats> getExecutionVertexStats(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex, int i) {
        Optional<VertexThreadInfoStats> ofNullable;
        synchronized (this.lock) {
            ExecutionVertexKey key = getKey(jobID, accessExecutionJobVertex, i);
            VertexThreadInfoStats ifPresent = this.executionVertexStatsCache.getIfPresent(key);
            if (ifPresent == null || System.currentTimeMillis() >= ifPresent.getEndTime() + this.statsRefreshInterval.toMillis()) {
                triggerThreadInfoSampleInternal(key, accessExecutionJobVertex);
            }
            ofNullable = Optional.ofNullable(ifPresent);
        }
        return ofNullable;
    }

    private void triggerThreadInfoSampleInternal(JobVertexKey jobVertexKey, AccessExecutionJobVertex accessExecutionJobVertex) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.shutDown || this.pendingJobVertexStats.contains(jobVertexKey)) {
            return;
        }
        this.pendingJobVertexStats.add(jobVertexKey);
        triggerThreadInfoRequestForVertices(new JobVertexThreadInfoSampleCompletionCallback(jobVertexKey, accessExecutionJobVertex.getName()), accessExecutionJobVertex.getTaskVertices());
    }

    private void triggerThreadInfoSampleInternal(ExecutionVertexKey executionVertexKey, AccessExecutionJobVertex accessExecutionJobVertex) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.shutDown || this.pendingJobVertexStats.contains(executionVertexKey.getJobVertexKey()) || this.pendingExecutionVertexStats.contains(executionVertexKey)) {
            return;
        }
        this.pendingExecutionVertexStats.add(executionVertexKey);
        AccessExecutionVertex[] accessExecutionVertexArr = (AccessExecutionVertex[]) Arrays.stream(accessExecutionJobVertex.getTaskVertices()).filter(accessExecutionVertex -> {
            return accessExecutionVertex.getParallelSubtaskIndex() == executionVertexKey.subtaskIndex;
        }).toArray(i -> {
            return new AccessExecutionVertex[i];
        });
        if (accessExecutionVertexArr.length == 0) {
            return;
        }
        triggerThreadInfoRequestForVertices(new ExecutionVertexThreadInfoSampleCompletionCallback(executionVertexKey, accessExecutionVertexArr[0].getTaskNameWithSubtaskIndex()), accessExecutionVertexArr);
    }

    private void triggerThreadInfoRequestForVertices(ThreadInfoSampleCompletionCallback threadInfoSampleCompletionCallback, AccessExecutionVertex[] accessExecutionVertexArr) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Triggering thread info sample for tasks: {}", Arrays.toString(accessExecutionVertexArr));
        }
        this.resourceManagerGatewayRetriever.getFuture().thenCompose(resourceManagerGateway -> {
            return this.coordinator.triggerThreadInfoRequest(matchExecutionsWithGateways(accessExecutionVertexArr, resourceManagerGateway), this.numSamples, this.delayBetweenSamples, this.maxThreadInfoDepth);
        }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) threadInfoSampleCompletionCallback, (Executor) this.executor);
    }

    private Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> matchExecutionsWithGateways(AccessExecutionVertex[] accessExecutionVertexArr, ResourceManagerGateway resourceManagerGateway) {
        return mapExecutionsToGateways(resourceManagerGateway, groupExecutionsByLocation(accessExecutionVertexArr));
    }

    private Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> mapExecutionsToGateways(ResourceManagerGateway resourceManagerGateway, Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> entry : map.entrySet()) {
            hashMap.put(entry.getValue(), resourceManagerGateway.requestTaskExecutorThreadInfoGateway(entry.getKey().getResourceID(), this.rpcTimeout));
        }
        return hashMap;
    }

    private Map<TaskManagerLocation, ImmutableSet<ExecutionAttemptID>> groupExecutionsByLocation(AccessExecutionVertex[] accessExecutionVertexArr) {
        HashMap hashMap = new HashMap();
        for (AccessExecutionVertex accessExecutionVertex : accessExecutionVertexArr) {
            if (accessExecutionVertex.getExecutionState() != ExecutionState.RUNNING) {
                LOG.trace("{} not running, but {}; not sampling", accessExecutionVertex.getTaskNameWithSubtaskIndex(), accessExecutionVertex.getExecutionState());
            } else {
                for (AccessExecution accessExecution : accessExecutionVertex.getCurrentExecutions()) {
                    TaskManagerLocation assignedResourceLocation = accessExecution.getAssignedResourceLocation();
                    if (assignedResourceLocation == null) {
                        LOG.trace("ExecutionVertex {} is currently not assigned", accessExecutionVertex);
                    } else {
                        Set set = (Set) hashMap.getOrDefault(assignedResourceLocation, new HashSet());
                        set.add(accessExecution.getAttemptId());
                        hashMap.put(assignedResourceLocation, set);
                    }
                }
            }
        }
        return (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ImmutableSet.copyOf((Collection) entry.getValue());
        }));
    }

    @VisibleForTesting
    void cleanUpStatsCache() {
        this.jobVertexStatsCache.cleanUp();
        this.executionVertexStatsCache.cleanUp();
    }

    @Override // org.apache.flink.runtime.webmonitor.stats.VertexStatsTracker
    public void shutDown() {
        synchronized (this.lock) {
            if (!this.shutDown) {
                this.jobVertexStatsCache.invalidateAll();
                this.pendingJobVertexStats.clear();
                this.executionVertexStatsCache.invalidateAll();
                this.pendingExecutionVertexStats.clear();
                this.shutDown = true;
            }
        }
    }

    @VisibleForTesting
    CompletableFuture<Void> getResultAvailableFuture() {
        return this.resultAvailableFuture;
    }

    private static JobVertexKey getKey(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex) {
        return new JobVertexKey(jobID, accessExecutionJobVertex.getJobVertexId());
    }

    private static ExecutionVertexKey getKey(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex, int i) {
        return new ExecutionVertexKey(jobID, accessExecutionJobVertex.getJobVertexId(), i);
    }

    static {
        $assertionsDisabled = !VertexThreadInfoTracker.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(VertexThreadInfoTracker.class);
    }
}
