package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.PendingCheckpointTest;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.sort.ExternalSortLargeRecordsITCase;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraphTest;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.class */
public class AdaptiveSchedulerTest extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = new TestExecutorResource<>(Executors::newSingleThreadScheduledExecutor);
    private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor((ScheduledExecutorService) TEST_EXECUTOR_RESOURCE.getExecutor());
    private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$JobStatus = new int[JobStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FINISHED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$DummyState.class */
    public static class DummyState implements State {
        private final JobStatus jobStatus;

        /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$DummyState$Factory.class */
        private static class Factory implements StateFactory<DummyState> {
            private final JobStatus jobStatus;

            public Factory() {
                this(JobStatus.RUNNING);
            }

            public Factory(JobStatus jobStatus) {
                this.jobStatus = jobStatus;
            }

            public Class<DummyState> getStateClass() {
                return DummyState.class;
            }

            /* renamed from: getState, reason: merged with bridge method [inline-methods] */
            public DummyState m523getState() {
                return new DummyState(this.jobStatus);
            }
        }

        public DummyState() {
            this(JobStatus.RUNNING);
        }

        public DummyState(JobStatus jobStatus) {
            this.jobStatus = jobStatus;
        }

        public void cancel() {
        }

        public void suspend(Throwable th) {
        }

        public JobStatus getJobStatus() {
            return this.jobStatus;
        }

        public ArchivedExecutionGraph getJob() {
            return null;
        }

        public void handleGlobalFailure(Throwable th) {
        }

        public Logger getLogger() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$LifecycleMethodCapturingState.class */
    public static class LifecycleMethodCapturingState extends DummyState {
        boolean onLeaveCalled;

        @Nullable
        Class<? extends State> onLeaveNewStateArgument;

        private LifecycleMethodCapturingState() {
            this.onLeaveCalled = false;
            this.onLeaveNewStateArgument = null;
        }

        void reset() {
            this.onLeaveCalled = false;
            this.onLeaveNewStateArgument = null;
        }

        public void onLeave(Class<? extends State> cls) {
            this.onLeaveCalled = true;
            this.onLeaveNewStateArgument = cls;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$StateInstanceFactory.class */
    private static class StateInstanceFactory implements StateFactory<LifecycleMethodCapturingState> {
        private final LifecycleMethodCapturingState instance;

        public StateInstanceFactory(LifecycleMethodCapturingState lifecycleMethodCapturingState) {
            this.instance = lifecycleMethodCapturingState;
        }

        public Class<LifecycleMethodCapturingState> getStateClass() {
            return LifecycleMethodCapturingState.class;
        }

        /* renamed from: getState, reason: merged with bridge method [inline-methods] */
        public LifecycleMethodCapturingState m524getState() {
            return this.instance;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest$SubmissionBufferingTaskManagerGateway.class */
    public static class SubmissionBufferingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;

        public SubmissionBufferingTaskManagerGateway(int i) {
            this.submittedTasks = new ArrayBlockingQueue(i);
            setSubmitConsumer(taskDeploymentDescriptor -> {
            });
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> consumer) {
            BlockingQueue<TaskDeploymentDescriptor> blockingQueue = this.submittedTasks;
            blockingQueue.getClass();
            Consumer consumer2 = (v1) -> {
                r1.offer(v1);
            };
            super.setSubmitConsumer(consumer2.andThen(consumer));
        }

        public List<TaskDeploymentDescriptor> waitForSubmissions(int i, Duration duration) throws InterruptedException {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(this.submittedTasks.poll(duration.toMillis(), TimeUnit.MILLISECONDS));
            }
            return arrayList;
        }
    }

    @Test
    public void testInitialState() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getState()).isInstanceOf(Created.class);
    }

    @Test
    public void testArchivedCheckpointingSettingsNotNullIfCheckpointingIsEnabled() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        createJobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), (SerializedValue) null));
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(new AdaptiveSchedulerBuilder(createJobGraph, this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).getArchivedExecutionGraph(JobStatus.INITIALIZING, (Throwable) null));
    }

    @Test
    public void testIsState() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Assertions.assertThat(build.isState(build.getState())).isTrue();
        Assertions.assertThat(build.isState(new DummyState())).isFalse();
    }

    @Test
    public void testRunIfState() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        build.runIfState(build.getState(), () -> {
            atomicBoolean.set(true);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
    }

    @Test
    public void testRunIfStateWithStateMismatch() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        build.runIfState(new DummyState(), () -> {
            atomicBoolean.set(true);
        });
        Assertions.assertThat(atomicBoolean.get()).isFalse();
    }

    @Test
    public void testHasEnoughResourcesReturnsFalseIfUnsatisfied() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.startScheduling();
        Assertions.assertThat(build.hasDesiredResources(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1))).isFalse();
    }

    @Test
    public void testHasEnoughResourcesReturnsTrueIfSatisfied() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.mainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.startScheduling();
        ResourceCounter withResource = ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        SlotPoolTestUtils.offerSlots(createDeclarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(withResource));
        Assertions.assertThat(build.hasDesiredResources(withResource)).isTrue();
    }

    @Test
    public void testHasEnoughResourcesUsesUnmatchedSlotsAsUnknown() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.mainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.startScheduling();
        ResourceCounter withResource = ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
        SlotPoolTestUtils.offerSlots(createDeclarativeSlotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.newBuilder().setCpuCores(1.0d).build(), 1)));
        Assertions.assertThat(build.hasDesiredResources(withResource)).isTrue();
    }

    @Test
    public void testExecutionGraphGenerationWithAvailableResources() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(1);
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Assertions.assertThat(((ArchivedExecutionGraph) CompletableFuture.supplyAsync(() -> {
            return build.requestJob().getArchivedExecutionGraph();
        }, this.singleThreadMainThreadExecutor).join()).getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(1);
    }

    @Test
    public void testExecutionGraphGenerationSetsInitializationTimestamp() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setInitializationTimestamp(42L).setDeclarativeSlotPool(createDeclarativeSlotPool).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 4)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Assertions.assertThat(((ArchivedExecutionGraph) CompletableFuture.supplyAsync(() -> {
            return build.requestJob().getArchivedExecutionGraph();
        }, this.singleThreadMainThreadExecutor).join()).getStatusTimestamp(JobStatus.INITIALIZING)).isEqualTo(42L);
    }

    @Test
    public void testInitializationTimestampForwarding() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setInitializationTimestamp(42L).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).requestJob().getArchivedExecutionGraph().getStatusTimestamp(JobStatus.INITIALIZING)).isEqualTo(42L);
    }

    @Test
    public void testFatalErrorsForwardedToFatalErrorHandler() throws Exception {
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setFatalErrorHandler(testingFatalErrorHandler).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        RuntimeException runtimeException = new RuntimeException();
        build.runIfState(build.getState(), () -> {
            throw runtimeException;
        });
        Assertions.assertThat(testingFatalErrorHandler.getException()).isEqualTo(runtimeException);
    }

    @Test
    public void testResourceTimeout() throws Exception {
        ManuallyTriggeredComponentMainThreadExecutor manuallyTriggeredComponentMainThreadExecutor = new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
        Duration ofMinutes = Duration.ofMinutes(1234L);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, ofMinutes);
        new AdaptiveSchedulerBuilder(createJobGraph(), manuallyTriggeredComponentMainThreadExecutor).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).startScheduling();
        Assertions.assertThat(manuallyTriggeredComponentMainThreadExecutor.getActiveNonPeriodicScheduledTask().stream().anyMatch(scheduledFuture -> {
            return scheduledFuture.getDelay(TimeUnit.MINUTES) == ofMinutes.toMinutes();
        })).isTrue();
    }

    @Test
    public void testNumRestartsMetric() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingMetricRegistry build = TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            if ("numRestarts".equals(str)) {
                completableFuture.complete((Gauge) metric);
            }
        }).build();
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool defaultDeclarativeSlotPool = new DefaultDeclarativeSlotPool(createJobGraph.getJobID(), new DefaultAllocatedSlotPool(), collection -> {
        }, Time.minutes(10L), Time.minutes(10L));
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveScheduler build2 = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup(build, "localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool(defaultDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Gauge gauge = (Gauge) completableFuture.get();
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        submissionBufferingTaskManagerGateway.setCancelConsumer(createCancelConsumer(build2));
        this.singleThreadMainThreadExecutor.execute(() -> {
            build2.startScheduling();
            defaultDeclarativeSlotPool.offerSlots(DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), new LocalTaskManagerLocation(), submissionBufferingTaskManagerGateway, System.currentTimeMillis());
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Assertions.assertThat((Long) gauge.getValue()).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> {
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) defaultDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 4)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        Assertions.assertThat((Long) gauge.getValue()).isEqualTo(1L);
    }

    @Test
    public void testStatusMetrics() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        TestingMetricRegistry build = TestingMetricRegistry.builder().setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -838362136:
                    if (str.equals("uptime")) {
                        z = false;
                        break;
                    }
                    break;
                case 785225284:
                    if (str.equals("restartingTimeTotal")) {
                        z = 2;
                        break;
                    }
                    break;
                case 1428051567:
                    if (str.equals("downtime")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                    completableFuture.complete((UpTimeGauge) metric);
                    return;
                case PendingCheckpointTest.PARALLELISM /* 1 */:
                    completableFuture2.complete((DownTimeGauge) metric);
                    return;
                case true:
                    completableFuture3.complete((Gauge) metric);
                    return;
                default:
                    return;
            }
        }).build();
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L));
        configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
        AdaptiveScheduler build2 = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobManagerJobMetricGroup(JobManagerMetricGroup.createJobManagerMetricGroup(build, "localhost").addJob(new JobID(), "jobName")).setDeclarativeSlotPool(createDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        UpTimeGauge upTimeGauge = (UpTimeGauge) completableFuture.get();
        DownTimeGauge downTimeGauge = (DownTimeGauge) completableFuture2.get();
        Gauge gauge = (Gauge) completableFuture3.get();
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        submissionBufferingTaskManagerGateway.setCancelConsumer(createCancelConsumer(build2));
        this.singleThreadMainThreadExecutor.execute(() -> {
            build2.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Assertions.assertThat(upTimeGauge.getValue()).isGreaterThan(0L);
        Assertions.assertThat(downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long) gauge.getValue()).isEqualTo(0L);
        this.singleThreadMainThreadExecutor.execute(() -> {
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5L));
        Thread.sleep(10L);
        Assertions.assertThat(upTimeGauge.getValue()).isGreaterThan(0L);
        Assertions.assertThat(downTimeGauge.getValue()).isEqualTo(0L);
        Assertions.assertThat((Long) gauge.getValue()).isGreaterThanOrEqualTo(0L);
    }

    @Test
    public void testStartSchedulingTransitionsToWaitingForResources() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.startScheduling();
        Assertions.assertThat(build.getState()).isInstanceOf(WaitingForResources.class);
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForDefaultMode() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        new AdaptiveSchedulerBuilder(createJobGraph, this.mainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).startScheduling();
        Assertions.assertThat(createDeclarativeSlotPool.getResourceRequirements()).contains(new ResourceRequirement[]{ResourceRequirement.create(ResourceProfile.UNKNOWN, 4)});
    }

    @Test
    public void testStartSchedulingSetsResourceRequirementsForReactiveMode() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
        new AdaptiveSchedulerBuilder(createJobGraph, this.mainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).startScheduling();
        Assertions.assertThat(createDeclarativeSlotPool.getResourceRequirements()).contains(new ResourceRequirement[]{ResourceRequirement.create(ResourceProfile.UNKNOWN, KeyGroupRangeAssignment.computeDefaultMaxParallelism(4))});
    }

    @Test
    public void testResourceAcquisitionTriggersJobExecution() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        CompletableFuture completableFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            completableFuture.complete(build.getState());
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 4)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        Assertions.assertThat(completableFuture.get()).isInstanceOf(WaitingForResources.class);
        submissionBufferingTaskManagerGateway.waitForSubmissions(4, Duration.ofSeconds(5L));
        Assertions.assertThat(((ArchivedExecutionGraph) CompletableFuture.supplyAsync(() -> {
            return build.requestJob().getArchivedExecutionGraph();
        }, this.singleThreadMainThreadExecutor).get()).getJobVertex(JOB_VERTEX.getID()).getParallelism()).isEqualTo(4);
    }

    @Test
    public void testGoToFinished() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.goToFinished(new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build());
        Assertions.assertThat(build.getState()).isInstanceOf(Finished.class);
    }

    @Test
    public void testJobStatusListenerOnlyCalledIfJobStatusChanges() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setJobStatusListener((jobID, jobStatus, j) -> {
            atomicInteger.incrementAndGet();
        }).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Assertions.assertThat(build.requestJobStatus()).withFailMessage("Assumption about job status for Scheduler@Created is incorrect.", new Object[0]).isEqualTo(JobStatus.INITIALIZING);
        build.transitionToState(new DummyState.Factory(JobStatus.INITIALIZING));
        Assertions.assertThat(atomicInteger.get()).isEqualTo(0);
    }

    @Test
    public void testJobStatusListenerNotifiedOfJobStatusChanges() throws Exception {
        JobGraph createJobGraph = createJobGraph();
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        CompletableFuture completableFuture4 = new CompletableFuture();
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setJobStatusListener((jobID, jobStatus, j) -> {
            switch (AnonymousClass2.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                case PendingCheckpointTest.PARALLELISM /* 1 */:
                    completableFuture.complete(null);
                    return;
                case 2:
                    completableFuture2.complete(null);
                    return;
                case 3:
                    completableFuture3.complete(null);
                    return;
                default:
                    completableFuture4.complete(jobStatus);
                    return;
            }
        }).setDeclarativeSlotPool(createDeclarativeSlotPool).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(5);
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        TaskDeploymentDescriptor take = submissionBufferingTaskManagerGateway.submittedTasks.take();
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.updateTaskExecutionState(new TaskExecutionState(take.getExecutionAttemptId(), ExecutionState.FINISHED));
        });
        completableFuture.get();
        completableFuture2.get();
        completableFuture3.get();
        Assertions.assertThat(completableFuture4.isDone()).isFalse();
    }

    @Test
    public void testCloseShutsDownCheckpointingComponents() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingCompletedCheckpointStore createStoreWithShutdownCheckAndNoCompletedCheckpoints = TestingCompletedCheckpointStore.createStoreWithShutdownCheckAndNoCompletedCheckpoints(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingCheckpointIDCounter createStoreWithShutdownCheckAndNoStartAction = TestingCheckpointIDCounter.createStoreWithShutdownCheckAndNoStartAction(completableFuture2);
        JobGraph createJobGraph = createJobGraph();
        createJobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().build(), (SerializedValue) null));
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setCheckpointRecoveryFactory(new TestingCheckpointRecoveryFactory(createStoreWithShutdownCheckAndNoCompletedCheckpoints, createStoreWithShutdownCheckAndNoStartAction)).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            build.handleGlobalFailure(new FlinkException("Test exception"));
            build.closeAsync();
        });
        Assertions.assertThat((Comparable) completableFuture.get()).isEqualTo(JobStatus.FAILED);
        Assertions.assertThat((Comparable) completableFuture2.get()).isEqualTo(JobStatus.FAILED);
    }

    @Test
    public void testTransitionToStateCallsOnLeave() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        LifecycleMethodCapturingState lifecycleMethodCapturingState = new LifecycleMethodCapturingState();
        build.transitionToState(new StateInstanceFactory(lifecycleMethodCapturingState));
        lifecycleMethodCapturingState.reset();
        build.transitionToState(new DummyState.Factory());
        Assertions.assertThat(lifecycleMethodCapturingState.onLeaveCalled).isTrue();
        Assertions.assertThat(lifecycleMethodCapturingState.onLeaveNewStateArgument.equals(DummyState.class)).isTrue();
    }

    @Test
    public void testConsistentMaxParallelism() throws Exception {
        int computeDefaultMaxParallelism = KeyGroupRangeAssignment.computeDefaultMaxParallelism(240);
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(240);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(createNoOpVertex);
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(streamingJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(streamingJobGraph, this.singleThreadMainThreadExecutor).setDeclarativeSlotPool(createDeclarativeSlotPool).setJobMasterConfiguration(configuration).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(241);
        submissionBufferingTaskManagerGateway.setCancelConsumer(createCancelConsumer(build));
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5L));
        ArchivedExecutionJobVertex jobVertex = getArchivedExecutionGraphForRunningJob(build).get().getJobVertex(createNoOpVertex.getID());
        Assertions.assertThat(jobVertex.getParallelism()).isEqualTo(1);
        Assertions.assertThat(jobVertex.getMaxParallelism()).isEqualTo(computeDefaultMaxParallelism);
        this.singleThreadMainThreadExecutor.execute(() -> {
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 240)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(240, Duration.ofSeconds(5L));
        ArchivedExecutionJobVertex jobVertex2 = getArchivedExecutionGraphForRunningJob(build).get().getJobVertex(createNoOpVertex.getID());
        Assertions.assertThat(jobVertex2.getParallelism()).isEqualTo(240);
        Assertions.assertThat(jobVertex2.getMaxParallelism()).isEqualTo(computeDefaultMaxParallelism);
    }

    @Test
    public void testHowToHandleFailureRejectedByStrategy() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy(NoRestartBackoffTimeStrategy.INSTANCE).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).howToHandleFailure(new Exception("test")).canRestart()).isFalse();
    }

    @Test
    public void testHowToHandleFailureAllowedByStrategy() throws Exception {
        TestRestartBackoffTimeStrategy testRestartBackoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, 1234L);
        FailureResult howToHandleFailure = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).howToHandleFailure(new Exception("test"));
        Assertions.assertThat(howToHandleFailure.canRestart()).isTrue();
        Assertions.assertThat(howToHandleFailure.getBackoffTime().toMillis()).isEqualTo(testRestartBackoffTimeStrategy.getBackoffTime());
    }

    @Test
    public void testHowToHandleFailureUnrecoverableFailure() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).howToHandleFailure(new SuppressRestartsException(new Exception("test"))).canRestart()).isFalse();
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> biConsumer) throws Exception {
        return runExceptionHistoryTests(biConsumer, adaptiveSchedulerBuilder -> {
        }, jobGraph -> {
        });
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> biConsumer, Consumer<AdaptiveSchedulerBuilder> consumer) throws Exception {
        return runExceptionHistoryTests(biConsumer, consumer, jobGraph -> {
        });
    }

    private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests(BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> biConsumer, Consumer<AdaptiveSchedulerBuilder> consumer, Consumer<JobGraph> consumer2) throws Exception {
        JobGraph createJobGraph = createJobGraph();
        consumer2.accept(createJobGraph);
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, standaloneCheckpointIDCounter);
        DeclarativeSlotPool createDeclarativeSlotPool = createDeclarativeSlotPool(createJobGraph.getJobID());
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
        AdaptiveSchedulerBuilder checkpointCleaner = new AdaptiveSchedulerBuilder(createJobGraph, this.singleThreadMainThreadExecutor).setJobMasterConfiguration(configuration).setDeclarativeSlotPool(createDeclarativeSlotPool).setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory).setCheckpointCleaner(checkpointsCleaner);
        consumer.accept(checkpointCleaner);
        AdaptiveScheduler build = checkpointCleaner.build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        SubmissionBufferingTaskManagerGateway submissionBufferingTaskManagerGateway = new SubmissionBufferingTaskManagerGateway(4);
        submissionBufferingTaskManagerGateway.setCancelConsumer(executionAttemptID -> {
            this.singleThreadMainThreadExecutor.execute(() -> {
                build.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED, (Throwable) null)));
            });
        });
        this.singleThreadMainThreadExecutor.execute(() -> {
            build.startScheduling();
            SlotPoolTestUtils.offerSlots((DeclarativeSlotPool) createDeclarativeSlotPool, (Collection<? extends SlotOffer>) DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource(ResourceProfile.UNKNOWN, 4)), (TaskManagerGateway) submissionBufferingTaskManagerGateway);
        });
        submissionBufferingTaskManagerGateway.waitForSubmissions(4, TestingUtils.infiniteDuration());
        CompletableFuture completableFuture = new CompletableFuture();
        this.singleThreadMainThreadExecutor.execute(() -> {
            completableFuture.complete(build.requestJob().getArchivedExecutionGraph().getAllExecutionVertices());
        });
        List list = (List) IterableUtils.toStream((Iterable) completableFuture.get()).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toList());
        CompletableFuture.runAsync(() -> {
            biConsumer.accept(build, list);
        }, this.singleThreadMainThreadExecutor).get();
        ComponentMainThreadExecutor componentMainThreadExecutor = this.singleThreadMainThreadExecutor;
        build.getClass();
        componentMainThreadExecutor.execute(build::cancel);
        build.getJobTerminationFuture().get();
        return build.requestJob().getExceptionHistory();
    }

    @Test
    public void testExceptionHistoryWithGlobalFailure() throws Exception {
        Exception exc = new Exception("Expected Global Exception");
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            adaptiveScheduler.handleGlobalFailure(exc);
        });
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        RootExceptionHistoryEntry next = runExceptionHistoryTests.iterator().next();
        Assertions.assertThat(next.getTaskManagerLocation()).isNull();
        Assertions.assertThat(next.getFailingTaskName()).isNull();
        Assertions.assertThat(next.getException().deserializeError(this.classLoader)).isEqualTo(exc);
    }

    @Test
    public void testExceptionHistoryWithTaskFailure() throws Exception {
        Exception exc = new Exception("Expected Local Exception");
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            adaptiveScheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState((ExecutionAttemptID) list.get(1), ExecutionState.FAILED, exc)));
        });
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        Assertions.assertThat(runExceptionHistoryTests.iterator().next().getException().deserializeError(this.classLoader)).isEqualTo(exc);
    }

    @Test
    public void testExceptionHistoryWithTaskFailureWithRestart() throws Exception {
        Exception exc = new Exception("Expected Local Exception");
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            adaptiveScheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState((ExecutionAttemptID) list.get(1), ExecutionState.FAILED, exc)));
        }, adaptiveSchedulerBuilder -> {
            adaptiveSchedulerBuilder.setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 100L).create());
        });
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        Assertions.assertThat(runExceptionHistoryTests.iterator().next().getException().deserializeError(this.classLoader)).isEqualTo(exc);
    }

    @Test
    public void testExceptionHistoryWithTaskFailureFromStopWithSavepoint() throws Exception {
        Exception exc = new Exception("Expected Local Exception");
        Consumer<JobGraph> consumer = jobGraph -> {
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(Long.MAX_VALUE).build(), (SerializedValue) null));
        };
        StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
        TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(standaloneCompletedCheckpointStore, standaloneCheckpointIDCounter);
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) list.get(1);
            adaptiveScheduler.stopWithSavepoint("file:///tmp/target", true, SavepointFormatType.CANONICAL);
            adaptiveScheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, exc)));
            Iterator it = list.iterator();
            while (it.hasNext()) {
                adaptiveScheduler.declineCheckpoint(new DeclineCheckpoint(adaptiveScheduler.requestJob().getJobId(), (ExecutionAttemptID) it.next(), standaloneCheckpointIDCounter.get() - 1, new CheckpointException(CheckpointFailureReason.IO_EXCEPTION)));
            }
        }, adaptiveSchedulerBuilder -> {
            adaptiveSchedulerBuilder.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory).setCheckpointCleaner(checkpointsCleaner);
        }, consumer);
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        Assertions.assertThat(runExceptionHistoryTests.iterator().next().getException().deserializeError(this.classLoader)).isEqualTo(exc);
    }

    @Test
    public void testExceptionHistoryWithTaskConcurrentGlobalFailure() throws Exception {
        Exception exc = new Exception("Expected Global Exception 1");
        Exception exc2 = new Exception("Expected Global Exception 2");
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            adaptiveScheduler.handleGlobalFailure(exc);
            adaptiveScheduler.handleGlobalFailure(exc2);
        });
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        RootExceptionHistoryEntry next = runExceptionHistoryTests.iterator().next();
        Assertions.assertThat(next.getException().deserializeError(this.classLoader)).isEqualTo(exc);
        Assertions.assertThat((List) IterableUtils.toStream(next.getConcurrentExceptions()).map((v0) -> {
            return v0.getException();
        }).map(serializedThrowable -> {
            return serializedThrowable.deserializeError(this.classLoader);
        }).collect(Collectors.toList())).containsExactly(new Throwable[]{exc2});
    }

    @Test
    public void testExceptionHistoryWithTaskConcurrentFailure() throws Exception {
        Exception exc = new Exception("Expected Local Exception 1");
        Exception exc2 = new Exception("Expected Local Exception 2");
        Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests = runExceptionHistoryTests((adaptiveScheduler, list) -> {
            ExecutionAttemptID executionAttemptID = (ExecutionAttemptID) list.remove(0);
            ExecutionAttemptID executionAttemptID2 = (ExecutionAttemptID) list.remove(0);
            adaptiveScheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, exc)));
            adaptiveScheduler.updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(executionAttemptID2, ExecutionState.FAILED, exc2)));
        });
        Assertions.assertThat(runExceptionHistoryTests).hasSize(1);
        RootExceptionHistoryEntry next = runExceptionHistoryTests.iterator().next();
        Assertions.assertThat(next.getException().deserializeError(this.classLoader)).isEqualTo(exc);
        Assertions.assertThat((List) IterableUtils.toStream(next.getConcurrentExceptions()).map((v0) -> {
            return v0.getException();
        }).map(serializedThrowable -> {
            return serializedThrowable.deserializeError(this.classLoader);
        }).collect(Collectors.toList())).isEmpty();
    }

    @Test(expected = IllegalStateException.class)
    public void testRepeatedTransitionIntoCurrentStateFails() throws Exception {
        AdaptiveScheduler build = new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Assertions.assertThat(build.getState()).isInstanceOf(Created.class);
        build.transitionToState(new Created.Factory(build, this.log));
    }

    @Test
    public void testTriggerSavepointFailsInIllegalState() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test
    public void testStopWithSavepointFailsInIllegalState() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).triggerSavepoint("some directory", false, SavepointFormatType.CANONICAL)).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(CheckpointException.class);
    }

    @Test(expected = TaskNotRunningException.class)
    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
        new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).deliverOperatorEventToCoordinator(ExecutionGraphTestUtils.createExecutionAttemptId(), new OperatorID(), new TestOperatorEvent());
    }

    @Test
    public void testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).deliverCoordinationRequestToCoordinator(new OperatorID(), new CoordinationRequest() { // from class: org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest.1
        })).failsWithin(1L, TimeUnit.MILLISECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
    }

    @Test
    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).updateTaskExecutionState(new TaskExecutionStateTransition(new TaskExecutionState(ExecutionGraphTestUtils.createExecutionAttemptId(), ExecutionState.FAILED)))).isFalse();
    }

    @Test(expected = IOException.class)
    public void testRequestNextInputSplitFailsInIllegalState() throws Exception {
        new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).requestNextInputSplit(JOB_VERTEX.getID(), ExecutionGraphTestUtils.createExecutionAttemptId());
    }

    @Test(expected = PartitionProducerDisposedException.class)
    public void testRequestPartitionStateFailsInIllegalState() throws Exception {
        new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).requestPartitionState(new IntermediateDataSetID(), new ResultPartitionID());
    }

    @Test
    public void testTryToAssignSlotsReturnsNotPossibleIfExpectedResourcesAreNotAvailable() throws Exception {
        Assertions.assertThat(new AdaptiveSchedulerBuilder(createJobGraph(), this.mainThreadExecutor).setSlotAllocator(TestingSlotAllocator.newBuilder().setTryReserveResourcesFunction(vertexParallelism -> {
            return Optional.empty();
        }).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(new StateTrackingMockExecutionGraph(), new CreatingExecutionGraphTest.TestingVertexParallelism())).isSuccess()).isFalse();
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50), ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50));
        VertexParallelismStore computeVertexParallelismStoreForExecution = AdaptiveScheduler.computeVertexParallelismStoreForExecution(streamingJobGraph, SchedulerExecutionMode.REACTIVE, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex jobVertex : streamingJobGraph.getVertices()) {
            VertexParallelismInformation parallelismInfo = computeVertexParallelismStoreForExecution.getParallelismInfo(jobVertex.getID());
            Assertions.assertThat(parallelismInfo.getParallelism()).isEqualTo(jobVertex.getParallelism());
            Assertions.assertThat(parallelismInfo.getMaxParallelism()).isEqualTo(jobVertex.getMaxParallelism());
        }
    }

    @Test
    public void testComputeVertexParallelismStoreForExecutionInDefaultMode() {
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createNoOpVertex("v1", 1, 50), ExecutionGraphTestUtils.createNoOpVertex("v2", 50, 50));
        VertexParallelismStore computeVertexParallelismStoreForExecution = AdaptiveScheduler.computeVertexParallelismStoreForExecution(streamingJobGraph, (SchedulerExecutionMode) null, SchedulerBase::getDefaultMaxParallelism);
        for (JobVertex jobVertex : streamingJobGraph.getVertices()) {
            VertexParallelismInformation parallelismInfo = computeVertexParallelismStoreForExecution.getParallelismInfo(jobVertex.getID());
            Assertions.assertThat(parallelismInfo.getParallelism()).isEqualTo(jobVertex.getParallelism());
            Assertions.assertThat(parallelismInfo.getMaxParallelism()).isEqualTo(jobVertex.getMaxParallelism());
        }
    }

    @Test
    public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.doTestCheckpointCleanerIsClosedAfterCheckpointServices((checkpointRecoveryFactory, checkpointsCleaner) -> {
                JobGraph createJobGraph = createJobGraph();
                SchedulerTestingUtils.enableCheckpointing(createJobGraph);
                try {
                    return new AdaptiveSchedulerBuilder(createJobGraph, ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(newSingleThreadScheduledExecutor)).setCheckpointRecoveryFactory(checkpointRecoveryFactory).setCheckpointCleaner(checkpointsCleaner).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, newSingleThreadScheduledExecutor, this.log);
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }

    private CompletableFuture<ArchivedExecutionGraph> getArchivedExecutionGraphForRunningJob(SchedulerNG schedulerNG) {
        return CompletableFuture.supplyAsync(() -> {
            ArchivedExecutionGraph archivedExecutionGraph = null;
            while (true) {
                ArchivedExecutionGraph archivedExecutionGraph2 = archivedExecutionGraph;
                if (archivedExecutionGraph2 != null && archivedExecutionGraph2.getState() == JobStatus.RUNNING) {
                    return archivedExecutionGraph2;
                }
                archivedExecutionGraph = schedulerNG.requestJob().getArchivedExecutionGraph();
            }
        }, this.singleThreadMainThreadExecutor);
    }

    private Consumer<ExecutionAttemptID> createCancelConsumer(SchedulerNG schedulerNG) {
        return executionAttemptID -> {
            this.singleThreadMainThreadExecutor.execute(() -> {
                schedulerNG.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.CANCELED));
            });
        };
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDeclarativeSlotPool(JobID jobID) {
        return new DefaultDeclarativeSlotPool(jobID, new DefaultAllocatedSlotPool(), collection -> {
        }, Time.minutes(10L), Time.minutes(10L));
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.streamingJobGraph(JOB_VERTEX);
    }
}
