package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.VertexThreadInfoTracker;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTrackerTest.class */
public class VertexThreadInfoTrackerTest extends TestLogger {
    private static final int REQUEST_ID = 0;
    private static final int PARALLELISM = 10;
    private static VertexThreadInfoStats threadInfoStatsDefaultSample;
    private static final int NUMBER_OF_SAMPLES = 1;
    private static final int MAX_STACK_TRACE_DEPTH = 100;
    private static ScheduledExecutorService executor;
    private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = createExecutionJobVertex();
    private static final ExecutionAttemptID[] ATTEMPT_IDS = (ExecutionAttemptID[]) Arrays.stream(EXECUTION_JOB_VERTEX.getTaskVertices()).map(executionVertex -> {
        return executionVertex.getCurrentExecutionAttempt().getAttemptId();
    }).sorted(Comparator.comparingInt((v0) -> {
        return v0.getSubtaskIndex();
    })).toArray(i -> {
        return new ExecutionAttemptID[i];
    });
    private static final JobID JOB_ID = new JobID();
    private static final Duration CLEAN_UP_INTERVAL = Duration.ofSeconds(60);
    private static final Duration STATS_REFRESH_INTERVAL = Duration.ofSeconds(60);
    private static final Duration TIME_GAP = Duration.ofSeconds(60);
    private static final Duration SMALL_TIME_GAP = Duration.ofMillis(1);
    private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(10);
    private static final Duration DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50);

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTrackerTest$LatchRemovalListener.class */
    private static class LatchRemovalListener<K, V> implements RemovalListener<K, V> {
        private final CountDownLatch latch;

        private LatchRemovalListener(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void onRemoval(@Nonnull RemovalNotification<K, V> removalNotification) {
            this.latch.countDown();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTrackerTest$TestingBlockingAndCountableCoordinator.class */
    private static class TestingBlockingAndCountableCoordinator extends ThreadInfoRequestCoordinator {
        private final CompletableFuture<VertexThreadInfoStats> blockingFuture;
        private final AtomicLong triggerCounter;

        TestingBlockingAndCountableCoordinator(CompletableFuture<VertexThreadInfoStats> completableFuture) {
            super((v0) -> {
                v0.run();
            }, VertexThreadInfoTrackerTest.REQUEST_TIMEOUT);
            this.blockingFuture = completableFuture;
            this.triggerCounter = new AtomicLong(0L);
        }

        public long getTriggerCounter() {
            return this.triggerCounter.get();
        }

        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> map, int i, Duration duration, int i2) {
            this.triggerCounter.getAndIncrement();
            return this.blockingFuture.thenApply(vertexThreadInfoStats -> {
                return VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertices(vertexThreadInfoStats, (Set) map.keySet().iterator().next());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/VertexThreadInfoTrackerTest$TestingThreadInfoRequestCoordinator.class */
    public static class TestingThreadInfoRequestCoordinator extends ThreadInfoRequestCoordinator {
        private final VertexThreadInfoStats[] jobVertexThreadInfoStats;
        private int counter;

        TestingThreadInfoRequestCoordinator(Executor executor, Duration duration, VertexThreadInfoStats... vertexThreadInfoStatsArr) {
            super(executor, duration);
            this.counter = 0;
            this.jobVertexThreadInfoStats = vertexThreadInfoStatsArr;
        }

        private VertexThreadInfoStats getVertexThreadInfoStats() {
            VertexThreadInfoStats[] vertexThreadInfoStatsArr = this.jobVertexThreadInfoStats;
            int i = this.counter;
            this.counter = i + 1;
            return vertexThreadInfoStatsArr[i % this.jobVertexThreadInfoStats.length];
        }

        public CompletableFuture<VertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> map, int i, Duration duration, int i2) {
            AssertionsForClassTypes.assertThat(map.size() == 1).isTrue();
            return CompletableFuture.completedFuture(VertexThreadInfoTrackerTest.generateThreadInfoStatsForExecutionVertices(getVertexThreadInfoStats(), map.keySet().iterator().next()));
        }
    }

    @BeforeAll
    public static void setUp() {
        threadInfoStatsDefaultSample = createThreadInfoStats(0, SMALL_TIME_GAP);
        executor = Executors.newScheduledThreadPool(1);
    }

    @AfterAll
    public static void tearDown() {
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testGetThreadInfoStats() throws Exception {
        doInitialJobVertexRequestAndVerifyResult(createThreadInfoTracker());
        for (int i = 0; i < 10; i++) {
            doInitialExecutionVertexRequestAndVerifyResult(createThreadInfoTracker(), i);
        }
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        VertexThreadInfoStats createThreadInfoStats = createThreadInfoStats(1, TIME_GAP);
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample, createThreadInfoStats);
        doInitialJobVertexRequestAndVerifyResult(createThreadInfoTracker);
        assertExpectedEqualsReceived(threadInfoStatsDefaultSample, createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX));
        for (int i = 0; i < 10; i++) {
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(threadInfoStatsDefaultSample, ATTEMPT_IDS[i]), createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            VertexThreadInfoTracker createThreadInfoTracker2 = createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample, createThreadInfoStats);
            doInitialExecutionVertexRequestAndVerifyResult(createThreadInfoTracker2, i2);
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(threadInfoStatsDefaultSample, ATTEMPT_IDS[0]), createThreadInfoTracker2.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i2));
        }
    }

    @Test
    public void testJobVertexCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration ofMillis = Duration.ofMillis(1000L);
        VertexThreadInfoStats createThreadInfoStats = createThreadInfoStats(Instant.now().minus(10L, (TemporalUnit) ChronoUnit.SECONDS), 0, Duration.ofMillis(5L));
        VertexThreadInfoStats createThreadInfoStats2 = createThreadInfoStats(1, TIME_GAP);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker(CLEAN_UP_INTERVAL, ofMillis, createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener(countDownLatch)), (Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats>) null, createThreadInfoStats, createThreadInfoStats2);
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        createThreadInfoTracker.getResultAvailableFuture().get();
        assertExpectedEqualsReceived(createThreadInfoStats, createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX));
        countDownLatch.await();
        assertExpectedEqualsReceived(createThreadInfoStats2, createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX));
    }

    @Test
    public void testExecutionVertexCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration ofMillis = Duration.ofMillis(1000L);
        VertexThreadInfoStats createThreadInfoStats = createThreadInfoStats(Instant.now().minus(10L, (TemporalUnit) ChronoUnit.SECONDS), 0, Duration.ofMillis(5L));
        VertexThreadInfoStats createThreadInfoStats2 = createThreadInfoStats(1, TIME_GAP);
        for (int i = 0; i < 10; i++) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker(CLEAN_UP_INTERVAL, ofMillis, (Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats>) null, createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener(countDownLatch)), createThreadInfoStats, createThreadInfoStats2);
            AssertionsForClassTypes.assertThat(createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i)).isNotPresent();
            createThreadInfoTracker.getResultAvailableFuture().get();
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(createThreadInfoStats, ATTEMPT_IDS[i]), createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
            countDownLatch.await();
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(createThreadInfoStats2, ATTEMPT_IDS[i]), createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
        }
    }

    @Test
    public void testExecutionVertexShouldBeIgnoredWhenJobVertexIsPending() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingBlockingAndCountableCoordinator testingBlockingAndCountableCoordinator = new TestingBlockingAndCountableCoordinator(completableFuture);
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker(CLEAN_UP_INTERVAL, STATS_REFRESH_INTERVAL, (Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats>) null, (Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats>) null, testingBlockingAndCountableCoordinator);
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        AssertionsForClassTypes.assertThat(testingBlockingAndCountableCoordinator.getTriggerCounter()).isOne();
        for (int i = 0; i < 10; i++) {
            AssertionsForClassTypes.assertThat(createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i)).isNotPresent();
            AssertionsForClassTypes.assertThat(createThreadInfoTracker.getResultAvailableFuture()).isNotCompleted();
            AssertionsForClassTypes.assertThat(testingBlockingAndCountableCoordinator.getTriggerCounter()).isOne();
        }
        completableFuture.complete(threadInfoStatsDefaultSample);
        createThreadInfoTracker.getResultAvailableFuture().get();
        for (int i2 = 0; i2 < 10; i2++) {
            AssertionsForClassTypes.assertThat(createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i2)).isPresent();
            AssertionsForClassTypes.assertThat(testingBlockingAndCountableCoordinator.getTriggerCounter()).isOne();
        }
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        Duration ofMillis = Duration.ofMillis(1L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker(ofMillis, STATS_REFRESH_INTERVAL, createCache(ofMillis, new LatchRemovalListener(countDownLatch)), (Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats>) null, threadInfoStatsDefaultSample);
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        countDownLatch.await();
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        for (int i = 0; i < 10; i++) {
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            VertexThreadInfoTracker createThreadInfoTracker2 = createThreadInfoTracker(ofMillis, STATS_REFRESH_INTERVAL, (Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats>) null, createCache(ofMillis, new LatchRemovalListener(countDownLatch2)), threadInfoStatsDefaultSample);
            AssertionsForClassTypes.assertThat(createThreadInfoTracker2.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i)).isNotPresent();
            countDownLatch2.await();
            AssertionsForClassTypes.assertThat(createThreadInfoTracker2.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i)).isNotPresent();
        }
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker();
        doInitialJobVertexRequestAndVerifyResult(createThreadInfoTracker);
        createThreadInfoTracker.cleanUpStatsCache();
        assertExpectedEqualsReceived(threadInfoStatsDefaultSample, createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX));
        for (int i = 0; i < 10; i++) {
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(threadInfoStatsDefaultSample, ATTEMPT_IDS[i]), createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
        }
    }

    @Test
    public void testShutDown() throws Exception {
        VertexThreadInfoTracker createThreadInfoTracker = createThreadInfoTracker();
        doInitialJobVertexRequestAndVerifyResult(createThreadInfoTracker);
        createThreadInfoTracker.shutDown();
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, 0)).isNotPresent();
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        AssertionsForClassTypes.assertThat(createThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, 0)).isNotPresent();
    }

    private <K> Cache<K, VertexThreadInfoStats> createCache(Duration duration, RemovalListener<K, VertexThreadInfoStats> removalListener) {
        return CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(duration.toMillis(), TimeUnit.MILLISECONDS).removalListener(removalListener).build();
    }

    private void doInitialJobVertexRequestAndVerifyResult(VertexThreadInfoTracker vertexThreadInfoTracker) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat(vertexThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX)).isNotPresent();
        vertexThreadInfoTracker.getResultAvailableFuture().get();
        assertExpectedEqualsReceived(threadInfoStatsDefaultSample, vertexThreadInfoTracker.getJobVertexStats(JOB_ID, EXECUTION_JOB_VERTEX));
        for (int i = 0; i < 10; i++) {
            assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(threadInfoStatsDefaultSample, ATTEMPT_IDS[i]), vertexThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
        }
    }

    private void doInitialExecutionVertexRequestAndVerifyResult(VertexThreadInfoTracker vertexThreadInfoTracker, int i) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat(vertexThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i)).isNotPresent();
        vertexThreadInfoTracker.getResultAvailableFuture().get();
        assertExpectedEqualsReceived(generateThreadInfoStatsForExecutionVertex(threadInfoStatsDefaultSample, ATTEMPT_IDS[i]), vertexThreadInfoTracker.getExecutionVertexStats(JOB_ID, EXECUTION_JOB_VERTEX, i));
    }

    private static VertexThreadInfoStats generateThreadInfoStatsForExecutionVertex(VertexThreadInfoStats vertexThreadInfoStats, ExecutionAttemptID executionAttemptID) {
        return generateThreadInfoStatsForExecutionVertices(vertexThreadInfoStats, Collections.singleton(executionAttemptID));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static VertexThreadInfoStats generateThreadInfoStatsForExecutionVertices(VertexThreadInfoStats vertexThreadInfoStats, Set<ExecutionAttemptID> set) {
        return set.equals(vertexThreadInfoStats.getSamplesBySubtask().keySet()) ? vertexThreadInfoStats : new VertexThreadInfoStats(vertexThreadInfoStats.getRequestId(), vertexThreadInfoStats.getStartTime(), vertexThreadInfoStats.getEndTime(), (Map) vertexThreadInfoStats.getSamplesBySubtask().entrySet().stream().filter(entry -> {
            return set.contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private static void assertExpectedEqualsReceived(VertexThreadInfoStats vertexThreadInfoStats, Optional<VertexThreadInfoStats> optional) {
        AssertionsForClassTypes.assertThat(optional).isPresent();
        VertexThreadInfoStats vertexThreadInfoStats2 = optional.get();
        AssertionsForClassTypes.assertThat(vertexThreadInfoStats.getRequestId()).isEqualTo(vertexThreadInfoStats2.getRequestId());
        AssertionsForClassTypes.assertThat(vertexThreadInfoStats.getEndTime()).isEqualTo(vertexThreadInfoStats2.getEndTime());
        AssertionsForClassTypes.assertThat(vertexThreadInfoStats.getNumberOfSubtasks()).isEqualTo(vertexThreadInfoStats2.getNumberOfSubtasks());
        Iterator it = vertexThreadInfoStats2.getSamplesBySubtask().values().iterator();
        while (it.hasNext()) {
            AssertionsForClassTypes.assertThat(((Collection) it.next()).isEmpty()).isFalse();
        }
    }

    private VertexThreadInfoTracker createThreadInfoTracker() {
        return createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration duration, VertexThreadInfoStats... vertexThreadInfoStatsArr) {
        return createThreadInfoTracker(CLEAN_UP_INTERVAL, duration, (Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats>) null, (Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats>) null, vertexThreadInfoStatsArr);
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration duration, Duration duration2, @Nullable Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats> cache, @Nullable Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats> cache2, VertexThreadInfoStats... vertexThreadInfoStatsArr) {
        return createThreadInfoTracker(duration, duration2, cache, cache2, new TestingThreadInfoRequestCoordinator((v0) -> {
            v0.run();
        }, REQUEST_TIMEOUT, vertexThreadInfoStatsArr));
    }

    private VertexThreadInfoTracker createThreadInfoTracker(Duration duration, Duration duration2, @Nullable Cache<VertexThreadInfoTracker.JobVertexKey, VertexThreadInfoStats> cache, @Nullable Cache<VertexThreadInfoTracker.ExecutionVertexKey, VertexThreadInfoStats> cache2, ThreadInfoRequestCoordinator threadInfoRequestCoordinator) {
        return VertexThreadInfoTrackerBuilder.newBuilder(VertexThreadInfoTrackerTest::createMockResourceManagerGateway, executor, TestingUtils.TIMEOUT).setCoordinator(threadInfoRequestCoordinator).setCleanUpInterval(duration).setNumSamples(1).setStatsRefreshInterval(duration2).setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES).setMaxThreadInfoDepth(MAX_STACK_TRACE_DEPTH).setJobVertexStatsCache(cache).setExecutionVertexStatsCache(cache2).build();
    }

    private static VertexThreadInfoStats createThreadInfoStats(int i, Duration duration) {
        return createThreadInfoStats(Instant.now(), i, duration);
    }

    private static VertexThreadInfoStats createThreadInfoStats(Instant instant, int i, Duration duration) {
        Instant plus = instant.plus((TemporalAmount) duration);
        HashMap hashMap = new HashMap();
        for (ExecutionVertex executionVertex : EXECUTION_JOB_VERTEX.getTaskVertices()) {
            Optional createThreadInfoSample = JvmUtils.createThreadInfoSample(Thread.currentThread().getId(), MAX_STACK_TRACE_DEPTH);
            Preconditions.checkState(createThreadInfoSample.isPresent(), "The threadInfoSample should be empty.");
            hashMap.put(executionVertex.getCurrentExecutionAttempt().getAttemptId(), Collections.singletonList(createThreadInfoSample.get()));
        }
        return new VertexThreadInfoStats(i, instant.toEpochMilli(), plus.toEpochMilli(), hashMap);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            JobVertex jobVertex = new JobVertex("testVertex");
            jobVertex.setParallelism(10);
            jobVertex.setInvokableClass(AbstractInvokable.class);
            DefaultScheduler createScheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), new DirectScheduledExecutorService());
            ExecutionGraph executionGraph = createScheduler.getExecutionGraph();
            createScheduler.startScheduling();
            ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
            return createScheduler.getExecutionJobVertex(jobVertex.getID());
        } catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }

    private static CompletableFuture<ResourceManagerGateway> createMockResourceManagerGateway() {
        Function<ResourceID, CompletableFuture<TaskExecutorThreadInfoGateway>> function = resourceID -> {
            return CompletableFuture.completedFuture(null);
        };
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestTaskExecutorGatewayFunction(function);
        return CompletableFuture.completedFuture(testingResourceManagerGateway);
    }
}
