package org.apache.flink.runtime.operators.coordination;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.operators.sort.ExternalSortLargeRecordsITCase;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.class */
public class OperatorCoordinatorSchedulerTest extends TestLogger {
    private final JobVertexID testVertexId = new JobVertexID();
    private final OperatorID testOperatorId = new OperatorID();
    private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
    private DefaultScheduler createdScheduler;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$CoordinatorThatFailsCheckpointing.class */
    private static final class CoordinatorThatFailsCheckpointing extends TestingOperatorCoordinator {
        public CoordinatorThatFailsCheckpointing(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override // org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator
        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
            throw new Error(new TestException());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$CoordinatorThatFailsInStart.class */
    private static final class CoordinatorThatFailsInStart extends TestingOperatorCoordinator {
        public CoordinatorThatFailsInStart(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override // org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator
        public void start() throws Exception {
            throw new Exception("test failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$FailingTaskExecutorOperatorEventGateway.class */
    private static final class FailingTaskExecutorOperatorEventGateway implements TaskExecutorOperatorEventGateway {
        private FailingTaskExecutorOperatorEventGateway() {
        }

        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
            return FutureUtils.completedExceptionally(new TestException());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$ModernStateBackend.class */
    public static class ModernStateBackend implements StateBackend {
        private ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            throw new UnsupportedOperationException();
        }

        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$TestException.class */
    private static final class TestException extends Exception {
        private TestException() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$TestOperatorEvent.class */
    private static final class TestOperatorEvent implements OperatorEvent {
        private TestOperatorEvent() {
        }
    }

    @After
    public void shutdownScheduler() throws Exception {
        if (this.createdScheduler != null) {
            closeScheduler(this.createdScheduler);
        }
    }

    @Test
    public void testCoordinatorStartedWhenSchedulerStarts() throws Exception {
        Assert.assertTrue(getCoordinator(createAndStartScheduler()).isStarted());
    }

    @Test
    public void testCoordinatorDisposedWhenSchedulerStops() throws Exception {
        DefaultScheduler createAndStartScheduler = createAndStartScheduler();
        TestingOperatorCoordinator coordinator = getCoordinator(createAndStartScheduler);
        closeScheduler(createAndStartScheduler);
        Assert.assertTrue(coordinator.isClosed());
    }

    @Test
    public void testFailureToStartPropagatesExceptions() throws Exception {
        try {
            createScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsInStart::new)).startScheduling();
            Assert.fail("expected an exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void testFailureToStartClosesCoordinator() throws Exception {
        DefaultScheduler createScheduler = createScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsInStart::new));
        TestingOperatorCoordinator coordinator = getCoordinator(createScheduler);
        try {
            createScheduler.startScheduling();
        } catch (Exception e) {
        }
        Assert.assertTrue(coordinator.isClosed());
    }

    @Test
    public void deployingTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler createAndStartScheduler = createAndStartScheduler();
        TestingOperatorCoordinator coordinator = getCoordinator(createAndStartScheduler);
        failTask(createAndStartScheduler, 1);
        Assert.assertEquals(1L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.contains(new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.not(Matchers.contains(new Integer[]{0})));
    }

    @Test
    public void runningTaskFailureNotifiesCoordinator() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        failTask(createSchedulerAndDeployTasks, 1);
        Assert.assertEquals(1L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.contains(new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.not(Matchers.contains(new Integer[]{0})));
    }

    @Test
    public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
        DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks = createSchedulerWithAllRestartOnFailureAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerWithAllRestartOnFailureAndDeployTasks);
        failTask(createSchedulerWithAllRestartOnFailureAndDeployTasks, 1);
        Assert.assertEquals(2L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.containsInAnyOrder(new Integer[]{0, 1}));
    }

    @Test
    public void taskRepeatedFailureNotifyCoordinator() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        failAndRestartTask(createSchedulerAndDeployTasks, 0);
        failAndRestartTask(createSchedulerAndDeployTasks, 0);
        Assert.assertEquals(2L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.contains(new Integer[]{0, 0}));
    }

    @Test
    public void taskGatewayNotSetBeforeTasksRunning() throws Exception {
        Assert.assertNull(getCoordinator(createAndStartScheduler()).getSubtaskGateway(0));
    }

    @Test
    public void taskGatewayAvailableWhenTasksRunning() throws Exception {
        Assert.assertNotNull(getCoordinator(createSchedulerAndDeployTasks()).getSubtaskGateway(0));
    }

    @Test
    public void taskTaskManagerFailuresAreReportedBack() throws Exception {
        CompletableFuture sendEvent = getCoordinator(createSchedulerAndDeployTasks(new FailingTaskExecutorOperatorEventGateway())).getSubtaskGateway(0).sendEvent(new TestOperatorEvent());
        this.executor.triggerAll();
        Assert.assertThat(sendEvent, FlinkMatchers.futureFailedWith(TestException.class));
    }

    @Test
    @Ignore
    public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler createAndStartScheduler = createAndStartScheduler();
        TestingOperatorCoordinator coordinator = getCoordinator(createAndStartScheduler);
        cancelTask(createAndStartScheduler, 1);
        Assert.assertEquals(1L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.contains(new Integer[]{1}));
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.not(Matchers.contains(new Integer[]{0})));
    }

    @Test
    @Ignore
    public void runningTaskCancellationNotifiesCoordinator() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        cancelTask(createSchedulerAndDeployTasks, 0);
        Assert.assertEquals(1L, coordinator.getFailedTasks().size());
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.contains(new Integer[]{0}));
        Assert.assertThat(coordinator.getFailedTasks(), Matchers.not(Matchers.contains(new Integer[]{1})));
    }

    @Test
    public void testTakeCheckpoint() throws Exception {
        byte[] bArr = new byte[656];
        new Random().nextBytes(bArr);
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        CompletableFuture<CompletedCheckpoint> triggerCheckpoint = triggerCheckpoint(createSchedulerAndDeployTasks);
        coordinator.getLastTriggeredCheckpoint().complete(bArr);
        acknowledgeCurrentCheckpoint(createSchedulerAndDeployTasks);
        Assert.assertArrayEquals(bArr, getStateHandleContents(((OperatorState) triggerCheckpoint.get().getOperatorStates().get(this.testOperatorId)).getCoordinatorState()));
    }

    @Test
    public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
        Assert.assertThat(triggerCheckpoint(createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(this.testOperatorId, CoordinatorThatFailsCheckpointing::new))), futureWillCompleteWithTestException());
    }

    @Test
    public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        CompletableFuture<CompletedCheckpoint> triggerCheckpoint = triggerCheckpoint(createSchedulerAndDeployTasks);
        coordinator.getLastTriggeredCheckpoint().completeExceptionally(new TestException());
        waitForCompletionToPropagate(triggerCheckpoint);
        Assert.assertThat(triggerCheckpoint, futureWillCompleteWithTestException());
    }

    @Test
    public void testSavepointRestoresCoordinator() throws Exception {
        byte[] bArr = new byte[123];
        new Random().nextBytes(bArr);
        Assert.assertArrayEquals(bArr, getCoordinator(createSchedulerWithRestoredSavepoint(bArr)).getLastRestoredCheckpointState());
    }

    @Test
    public void testGlobalFailureResetsToCheckpoint() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        byte[] bArr = {7, 11, 3, 5};
        takeCompleteCheckpoint(createSchedulerAndDeployTasks, coordinator, bArr);
        failGlobalAndRestart(createSchedulerAndDeployTasks, new TestException());
        Assert.assertArrayEquals("coordinator should have a restored checkpoint", bArr, coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testGlobalFailureBeforeCheckpointResetsToEmptyState() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        failGlobalAndRestart(createSchedulerAndDeployTasks, new TestException());
        Assert.assertSame("coordinator should have null restored state", TestingOperatorCoordinator.NULL_RESTORE_VALUE, coordinator.getLastRestoredCheckpointState());
        Assert.assertEquals(-1L, coordinator.getLastRestoredCheckpointId());
    }

    @Test
    public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        takeCompleteCheckpoint(createSchedulerAndDeployTasks, coordinator, new byte[0]);
        failGlobalAndRestart(createSchedulerAndDeployTasks, new TestException());
        Assert.assertThat(coordinator.getRestoredTasks(), Matchers.empty());
    }

    @Test
    public void testLocalFailoverResetsTask() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        long takeCompleteCheckpoint = takeCompleteCheckpoint(createSchedulerAndDeployTasks, coordinator, new byte[0]);
        failAndRestartTask(createSchedulerAndDeployTasks, 1);
        Assert.assertEquals(1L, coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint subtaskAndCheckpoint = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals(1L, subtaskAndCheckpoint.subtaskIndex);
        Assert.assertEquals(takeCompleteCheckpoint, subtaskAndCheckpoint.checkpointId);
    }

    @Test
    public void testLocalFailoverBeforeCheckpointResetsTask() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        failAndRestartTask(createSchedulerAndDeployTasks, 1);
        Assert.assertEquals(1L, coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint subtaskAndCheckpoint = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals(1L, subtaskAndCheckpoint.subtaskIndex);
        Assert.assertEquals(-1L, subtaskAndCheckpoint.checkpointId);
    }

    @Test
    public void testLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        takeCompleteCheckpoint(createSchedulerAndDeployTasks, coordinator, new byte[]{37, 11, 83, 4});
        failAndRestartTask(createSchedulerAndDeployTasks, 0);
        Assert.assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testConfirmCheckpointComplete() throws Exception {
        DefaultScheduler createSchedulerAndDeployTasks = createSchedulerAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerAndDeployTasks);
        Assert.assertEquals("coordinator should be notified of completed checkpoint", takeCompleteCheckpoint(createSchedulerAndDeployTasks, coordinator, new byte[]{37, 11, 83, 4}), coordinator.getLastCheckpointComplete());
    }

    @Test
    public void testBatchGlobalFailureResetsToEmptyState() throws Exception {
        DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks = createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerWithoutCheckpointingAndDeployTasks);
        failGlobalAndRestart(createSchedulerWithoutCheckpointingAndDeployTasks, new TestException());
        Assert.assertSame("coordinator should have null restored state", TestingOperatorCoordinator.NULL_RESTORE_VALUE, coordinator.getLastRestoredCheckpointState());
        Assert.assertEquals(-1L, coordinator.getLastRestoredCheckpointId());
    }

    @Test
    public void testBatchGlobalFailoverDoesNotNotifyLocalRestore() throws Exception {
        DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks = createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerWithoutCheckpointingAndDeployTasks);
        failGlobalAndRestart(createSchedulerWithoutCheckpointingAndDeployTasks, new TestException());
        Assert.assertThat(coordinator.getRestoredTasks(), Matchers.empty());
    }

    @Test
    public void testBatchLocalFailoverResetsTask() throws Exception {
        DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks = createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerWithoutCheckpointingAndDeployTasks);
        failAndRestartTask(createSchedulerWithoutCheckpointingAndDeployTasks, 1);
        Assert.assertEquals(1L, coordinator.getRestoredTasks().size());
        TestingOperatorCoordinator.SubtaskAndCheckpoint subtaskAndCheckpoint = coordinator.getRestoredTasks().get(0);
        Assert.assertEquals(1L, subtaskAndCheckpoint.subtaskIndex);
        Assert.assertEquals(-1L, subtaskAndCheckpoint.checkpointId);
    }

    @Test
    public void testBatchLocalFailoverDoesNotResetToCheckpoint() throws Exception {
        DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks = createSchedulerWithoutCheckpointingAndDeployTasks();
        TestingOperatorCoordinator coordinator = getCoordinator(createSchedulerWithoutCheckpointingAndDeployTasks);
        failAndRestartTask(createSchedulerWithoutCheckpointingAndDeployTasks, 0);
        Assert.assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState());
    }

    @Test
    public void testDeliveringClientRequestToRequestHandler() throws Exception {
        Assert.assertEquals("testing payload", ((TestingCoordinationRequestHandler.Response) createScheduler(new TestingCoordinationRequestHandler.Provider(this.testOperatorId)).deliverCoordinationRequestToCoordinator(this.testOperatorId, new TestingCoordinationRequestHandler.Request("testing payload")).get()).getPayload());
    }

    @Test
    public void testDeliveringClientRequestToNonRequestHandler() throws Exception {
        DefaultScheduler createScheduler = createScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId));
        TestingCoordinationRequestHandler.Request request = new TestingCoordinationRequestHandler.Request("testing payload");
        CommonTestUtils.assertThrows("cannot handle client event", FlinkException.class, () -> {
            return createScheduler.deliverCoordinationRequestToCoordinator(this.testOperatorId, request);
        });
    }

    @Test
    public void testDeliveringClientRequestToNonExistingCoordinator() throws Exception {
        DefaultScheduler createScheduler = createScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId));
        TestingCoordinationRequestHandler.Request request = new TestingCoordinationRequestHandler.Request("testing payload");
        CommonTestUtils.assertThrows("does not exist", FlinkException.class, () -> {
            return createScheduler.deliverCoordinationRequestToCoordinator(new OperatorID(), request);
        });
    }

    private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return setupTestJobAndScheduler(provider);
    }

    private DefaultScheduler createAndStartScheduler() throws Exception {
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId));
        defaultScheduler.startScheduling();
        this.executor.triggerAll();
        Assert.assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, 0));
        return defaultScheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
        return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(this.testOperatorId));
    }

    private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, null, true);
        scheduleAllTasksToRunning(defaultScheduler);
        return defaultScheduler;
    }

    private DefaultScheduler createSchedulerWithoutCheckpointingAndDeployTasks() throws Exception {
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, jobGraph -> {
            jobGraph.setSnapshotSettings((JobCheckpointingSettings) null);
        }, false);
        Assert.assertNull(defaultScheduler.getExecutionGraph().getCheckpointCoordinator());
        scheduleAllTasksToRunning(defaultScheduler);
        return defaultScheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(provider);
        scheduleAllTasksToRunning(defaultScheduler);
        return defaultScheduler;
    }

    private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway) throws Exception {
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), taskExecutorOperatorEventGateway, null, false);
        scheduleAllTasksToRunning(defaultScheduler);
        return defaultScheduler;
    }

    private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] bArr) throws Exception {
        byte[] serializeAsCheckpointMetadata = serializeAsCheckpointMetadata(this.testOperatorId, bArr);
        TestingCheckpointStorageAccessCoordinatorView testingCheckpointStorageAccessCoordinatorView = new TestingCheckpointStorageAccessCoordinatorView();
        testingCheckpointStorageAccessCoordinatorView.registerSavepoint("testingSavepointPointer", serializeAsCheckpointMetadata);
        DefaultScheduler defaultScheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(this.testOperatorId), null, jobGraph -> {
            SchedulerTestingUtils.enableCheckpointing(jobGraph, new ModernStateBackend(), testingCheckpointStorageAccessCoordinatorView.asCheckpointStorage());
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("testingSavepointPointer"));
        }, false);
        defaultScheduler.startScheduling();
        return defaultScheduler;
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
        return setupTestJobAndScheduler(provider, null, null, false);
    }

    private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider, @Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway, @Nullable Consumer<JobGraph> consumer, boolean z) throws Exception {
        JobVertex jobVertex = new JobVertex("Vertex with OperatorCoordinator", this.testVertexId, Collections.singletonList(OperatorIDPair.of(new OperatorID(), provider.getOperatorId())));
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.addOperatorCoordinator(new SerializedValue(provider));
        jobVertex.setParallelism(2);
        JobGraph build = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(jobVertex).build();
        SchedulerTestingUtils.enableCheckpointing(build);
        if (consumer != null) {
            consumer.accept(build);
        }
        ComponentMainThreadExecutorServiceAdapter componentMainThreadExecutorServiceAdapter = new ComponentMainThreadExecutorServiceAdapter((ScheduledExecutorService) this.executor, Thread.currentThread());
        SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder = taskExecutorOperatorEventGateway == null ? SchedulerTestingUtils.createSchedulerBuilder(build, componentMainThreadExecutorServiceAdapter) : SchedulerTestingUtils.createSchedulerBuilder(build, componentMainThreadExecutorServiceAdapter, taskExecutorOperatorEventGateway);
        if (z) {
            createSchedulerBuilder.setFailoverStrategyFactory(new RestartAllFailoverStrategy.Factory());
        }
        DefaultScheduler build2 = createSchedulerBuilder.setFutureExecutor(this.executor).setDelayExecutor(this.executor).build();
        this.createdScheduler = build2;
        return build2;
    }

    private void scheduleAllTasksToRunning(DefaultScheduler defaultScheduler) {
        defaultScheduler.startScheduling();
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        SchedulerTestingUtils.setAllExecutionsToRunning(defaultScheduler);
        Assert.assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, 0));
        this.executor.triggerAll();
    }

    private TestingOperatorCoordinator getCoordinator(DefaultScheduler defaultScheduler) {
        ExecutionJobVertex jobVertex = getJobVertex(defaultScheduler, this.testVertexId);
        Assert.assertNotNull("vertex for coordinator not found", jobVertex);
        Optional findFirst = jobVertex.getOperatorCoordinators().stream().filter(operatorCoordinatorHolder -> {
            return operatorCoordinatorHolder.operatorId().equals(this.testOperatorId);
        }).findFirst();
        Assert.assertTrue("vertex does not contain coordinator", findFirst.isPresent());
        OperatorCoordinator coordinator = ((OperatorCoordinatorHolder) findFirst.get()).coordinator();
        Assert.assertThat(coordinator, Matchers.instanceOf(TestingOperatorCoordinator.class));
        return (TestingOperatorCoordinator) coordinator;
    }

    private void failTask(DefaultScheduler defaultScheduler, int i) {
        SchedulerTestingUtils.failExecution(defaultScheduler, this.testVertexId, i);
        this.executor.triggerAll();
        Assert.assertEquals(ExecutionState.FAILED, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, i));
    }

    private void failAndRedeployTask(DefaultScheduler defaultScheduler, int i) {
        failTask(defaultScheduler, i);
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        Assert.assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, i));
    }

    private void failAndRestartTask(DefaultScheduler defaultScheduler, int i) {
        failAndRedeployTask(defaultScheduler, i);
        SchedulerTestingUtils.setExecutionToState(ExecutionState.INITIALIZING, defaultScheduler, this.testVertexId, i);
        SchedulerTestingUtils.setExecutionToState(ExecutionState.RUNNING, defaultScheduler, this.testVertexId, i);
        Assert.assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, i));
    }

    private void failGlobalAndRestart(DefaultScheduler defaultScheduler, Throwable th) {
        defaultScheduler.handleGlobalFailure(th);
        SchedulerTestingUtils.setAllExecutionsToCancelled(defaultScheduler);
        this.executor.triggerAll();
        this.executor.triggerScheduledTasks();
        this.executor.triggerAll();
        SchedulerTestingUtils.setAllExecutionsToRunning(defaultScheduler);
        this.executor.triggerAll();
        Assert.assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, 0));
    }

    private void cancelTask(DefaultScheduler defaultScheduler, int i) {
        SchedulerTestingUtils.canceledExecution(defaultScheduler, this.testVertexId, i);
        this.executor.triggerAll();
        Assert.assertEquals(ExecutionState.CANCELED, SchedulerTestingUtils.getExecutionState(defaultScheduler, this.testVertexId, i));
    }

    private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        CompletableFuture<CompletedCheckpoint> triggerCheckpoint = SchedulerTestingUtils.triggerCheckpoint(defaultScheduler);
        TestingOperatorCoordinator coordinator = getCoordinator(defaultScheduler);
        while (!coordinator.hasTriggeredCheckpoint() && !triggerCheckpoint.isDone()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return triggerCheckpoint;
    }

    private void waitForCompletionToPropagate(CompletableFuture<?> completableFuture) {
        while (!completableFuture.isDone()) {
            this.executor.triggerAll();
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                throw new Error(e);
            }
        }
    }

    private void acknowledgeCurrentCheckpoint(DefaultScheduler defaultScheduler) {
        this.executor.triggerAll();
        SchedulerTestingUtils.acknowledgeCurrentCheckpoint(defaultScheduler);
        this.executor.triggerAll();
    }

    private long takeCompleteCheckpoint(DefaultScheduler defaultScheduler, TestingOperatorCoordinator testingOperatorCoordinator, byte[] bArr) throws Exception {
        CompletableFuture<CompletedCheckpoint> triggerCheckpoint = triggerCheckpoint(defaultScheduler);
        testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(bArr);
        acknowledgeCurrentCheckpoint(defaultScheduler);
        long checkpointID = triggerCheckpoint.get().getCheckpointID();
        while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {
            this.executor.triggerAll();
            Thread.sleep(1L);
        }
        return checkpointID;
    }

    private void closeScheduler(DefaultScheduler defaultScheduler) throws Exception {
        CompletableFuture closeAsync = defaultScheduler.closeAsync();
        this.executor.triggerAll();
        closeAsync.get();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler defaultScheduler, JobVertexID jobVertexID) {
        return defaultScheduler.getExecutionVertex(new ExecutionVertexID(jobVertexID, 0)).getJobVertex();
    }

    private static OperatorState createOperatorState(OperatorID operatorID, byte[] bArr) {
        OperatorState operatorState = new OperatorState(operatorID, 10, 16384);
        operatorState.setCoordinatorState(new ByteStreamStateHandle("name", bArr));
        return operatorState;
    }

    private static byte[] serializeAsCheckpointMetadata(OperatorID operatorID, byte[] bArr) throws IOException {
        CheckpointMetadata checkpointMetadata = new CheckpointMetadata(1337L, Collections.singletonList(createOperatorState(operatorID, bArr)), Collections.emptyList());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Checkpoints.storeCheckpointMetadata(checkpointMetadata, byteArrayOutputStream);
        return byteArrayOutputStream.toByteArray();
    }

    private static <T> Matcher<CompletableFuture<T>> futureWillCompleteWithTestException() {
        return FlinkMatchers.futureWillCompleteExceptionally(th -> {
            return Boolean.valueOf(ExceptionUtils.findThrowableSerializedAware(th, TestException.class, OperatorCoordinatorSchedulerTest.class.getClassLoader()).isPresent());
        }, Duration.ofSeconds(10L), "A TestException in the cause chain");
    }

    private static byte[] getStateHandleContents(StreamStateHandle streamStateHandle) {
        if (streamStateHandle instanceof ByteStreamStateHandle) {
            return ((ByteStreamStateHandle) streamStateHandle).getData();
        }
        Assert.fail("other state handles not implemented");
        return null;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case ExternalSortLargeRecordsITCase.SmallOrMediumOrLargeValue.SMALL_SIZE /* 0 */:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/runtime/util/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$CoordinatorThatFailsInStart") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$Context;)V")) {
                    return CoordinatorThatFailsInStart::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/runtime/util/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$CoordinatorThatFailsInStart") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$Context;)V")) {
                    return CoordinatorThatFailsInStart::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/runtime/util/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest$CoordinatorThatFailsCheckpointing") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/operators/coordination/OperatorCoordinator$Context;)V")) {
                    return CoordinatorThatFailsCheckpointing::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
