package org.apache.flink.runtime.operators.coordination;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.class */
public class OperatorCoordinatorHolderTest extends TestLogger {
    private final GlobalFailureHandler globalFailureHandler = th -> {
        this.globalFailure = th;
    };
    private Throwable globalFailure;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$CheckpointEventOrderTestBaseCoordinator.class */
    private static abstract class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable {
        private final Thread coordinatorThread = new Thread(this);
        protected final OperatorCoordinator.Context context;
        protected final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
        private volatile boolean closed;

        CheckpointEventOrderTestBaseCoordinator(OperatorCoordinator.Context context) {
            this.context = context;
            this.subtaskGateways = new OperatorCoordinator.SubtaskGateway[context.currentParallelism()];
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.closed = true;
            this.coordinatorThread.interrupt();
            this.coordinatorThread.join();
        }

        public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
        }

        public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        }

        public void subtaskReset(int i, long j) {
        }

        public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
            this.subtaskGateways[i] = subtaskGateway;
            for (OperatorCoordinator.SubtaskGateway subtaskGateway2 : this.subtaskGateways) {
                if (subtaskGateway2 == null) {
                    return;
                }
            }
            this.coordinatorThread.start();
        }

        public abstract void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception;

        public void notifyCheckpointComplete(long j) {
        }

        public void resetToCheckpoint(long j, byte[] bArr) throws Exception {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.closed) {
                try {
                    step();
                } catch (Throwable th) {
                    if (this.closed) {
                        return;
                    }
                    th.printStackTrace();
                    System.exit(-1);
                    return;
                }
            }
        }

        protected abstract void step() throws Exception;
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$FutureCompletedAfterSendingEventsCoordinator.class */
    private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator {
        private final OneShotLatch checkpointCompleted;

        @Nullable
        private volatile CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedAfterSendingEventsCoordinator(OperatorCoordinator.Context context) {
            super(context);
            this.checkpointCompleted = new OneShotLatch();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
            this.checkpoint = completableFuture;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        protected void step() throws Exception {
            Thread.sleep(2L);
            OperatorCoordinator.SubtaskGateway subtaskGateway = this.subtaskGateways[0];
            int i = this.num;
            this.num = i + 1;
            subtaskGateway.sendEvent(new TestOperatorEvent(i));
            OperatorCoordinator.SubtaskGateway subtaskGateway2 = this.subtaskGateways[1];
            int i2 = this.num;
            this.num = i2 + 1;
            subtaskGateway2.sendEvent(new TestOperatorEvent(i2));
            OperatorCoordinator.SubtaskGateway subtaskGateway3 = this.subtaskGateways[2];
            int i3 = this.num;
            this.num = i3 + 1;
            subtaskGateway3.sendEvent(new TestOperatorEvent(i3));
            CompletableFuture<byte[]> completableFuture = this.checkpoint;
            if (completableFuture != null) {
                completableFuture.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                this.checkpointCompleted.trigger();
                this.checkpoint = null;
            }
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        public void close() throws Exception {
            this.checkpointCompleted.await();
            super.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$FutureCompletedInstantlyTestCoordinator.class */
    private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator {
        private final ReentrantLock lock;
        private final Condition condition;

        @GuardedBy("lock")
        @Nullable
        private CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedInstantlyTestCoordinator(OperatorCoordinator.Context context) {
            super(context);
            this.lock = new ReentrantLock(true);
            this.condition = this.lock.newCondition();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
            this.lock.lock();
            try {
                this.checkpoint = completableFuture;
                this.condition.await();
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        protected void step() throws Exception {
            this.lock.lock();
            try {
                if (this.checkpoint != null) {
                    this.checkpoint.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                    this.checkpoint = null;
                }
                OperatorCoordinator.SubtaskGateway subtaskGateway = this.subtaskGateways[0];
                int i = this.num;
                this.num = i + 1;
                subtaskGateway.sendEvent(new TestOperatorEvent(i));
                this.condition.signalAll();
                Thread.sleep(2L);
            } finally {
                this.lock.unlock();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$ReorderableManualExecutorService.class */
    private static class ReorderableManualExecutorService extends ManuallyTriggeredScheduledExecutorService {
        private boolean delayNewRunnables;
        private final Queue<Runnable> delayedRunnables;

        private ReorderableManualExecutorService() {
            this.delayedRunnables = new ArrayDeque();
        }

        public void setDelayNewRunnables(boolean z) {
            this.delayNewRunnables = z;
        }

        @Override // java.util.concurrent.Executor
        public void execute(@Nonnull Runnable runnable) {
            if (this.delayNewRunnables) {
                this.delayedRunnables.add(runnable);
            } else {
                super.execute(runnable);
            }
        }

        public void executeAllDelayedRunnables() {
            while (!this.delayedRunnables.isEmpty()) {
                super.execute(this.delayedRunnables.poll());
            }
        }
    }

    @After
    public void checkNoGlobalFailure() throws Exception {
        if (this.globalFailure != null) {
            ExceptionUtils.rethrowException(this.globalFailure);
        }
    }

    @Test
    public void checkpointFutureInitiallyNotDone() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasks(), TestingOperatorCoordinator::new);
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(1L, completableFuture);
        Assertions.assertThat(completableFuture).isNotDone();
    }

    @Test
    public void completedCheckpointFuture() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasks(), TestingOperatorCoordinator::new);
        byte[] bArr = {11, 22, 33, 44};
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(9L, completableFuture);
        getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint().complete(bArr);
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat((byte[]) completableFuture.get()).containsExactly(bArr);
    }

    @Test
    public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(1L, new CompletableFuture());
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(1).sendEvent(new TestOperatorEvent(1));
        createCoordinatorHolder.handleEventFromOperator(1, 0, new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(1)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1)});
    }

    @Test
    public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 10L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337));
        Assertions.assertThat(createForRunningTasks.getNumberOfSentEvents()).isEqualTo(0);
    }

    @Test
    public void abortedCheckpointReleasesBlockedEvents() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 123L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337));
        createCoordinatorHolder.abortCurrentTriggering();
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1337)});
    }

    @Test
    public void acknowledgeCheckpointEventReleasesBlockedEvents() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1111L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337));
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1111L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1337)});
    }

    @Test
    public void restoreOpensGatewayEvents() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1000L);
        createCoordinatorHolder.resetToCheckpoint(1L, new byte[0]);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(1).sendEvent(new TestOperatorEvent(999));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(1)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(999)});
    }

    @Test
    public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(1000L, new CompletableFuture());
        CompletableFuture<byte[]> lastTriggeredCheckpoint = getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint();
        createCoordinatorHolder.abortCurrentTriggering();
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1010L);
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1010L));
        lastTriggeredCheckpoint.complete(new byte[0]);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(123));
        Assertions.assertThat(createForRunningTasks.events).containsExactly(new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(new TestOperatorEvent(123), 0)});
    }

    @Test
    public void triggerConcurrentCheckpoints() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1111L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1337));
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1112L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1338));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).isEmpty();
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1111L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1337)});
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1112L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1337), new TestOperatorEvent(1338)});
    }

    @Test
    public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 22L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1));
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(22L));
        createCoordinatorHolder.handleEventFromOperator(1, 0, new AcknowledgeCheckpointEvent(22L));
        createCoordinatorHolder.handleEventFromOperator(2, 0, new AcknowledgeCheckpointEvent(22L));
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(2));
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 23L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(3));
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(23L));
        createCoordinatorHolder.handleEventFromOperator(1, 0, new AcknowledgeCheckpointEvent(23L));
        createCoordinatorHolder.handleEventFromOperator(2, 0, new AcknowledgeCheckpointEvent(23L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)});
    }

    @Test
    public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 22L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(1));
        createCoordinatorHolder.abortCurrentTriggering();
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(2));
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 23L);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(3));
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(23L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)});
    }

    @Test
    public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasks(), context -> {
            return new TestingOperatorCoordinator(context) { // from class: org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.1
                @Override // org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator
                public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
                    context.failJob(new RuntimeException("Artificial Exception"));
                }
            };
        });
        createCoordinatorHolder.handleEventFromOperator(0, 0, new TestOperatorEvent());
        Assertions.assertThat(this.globalFailure).isNotNull();
        Throwable th = this.globalFailure;
        createCoordinatorHolder.handleEventFromOperator(1, 0, new TestOperatorEvent());
        Assertions.assertThat(th).as("The global failure should be the same instance because the contextshould only take the first request from the coordinator to fail the job.", new Object[0]).isEqualTo(this.globalFailure);
        createCoordinatorHolder.resetToCheckpoint(0L, new byte[0]);
        createCoordinatorHolder.handleEventFromOperator(1, 1, new TestOperatorEvent());
        Assertions.assertThat(th).as("The new failures should be propagated after the coordinator is reset.", new Object[0]).isNotEqualTo(this.globalFailure);
        this.globalFailure = null;
    }

    @Test
    public void checkpointCompletionWaitsForEventFutures() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasksWithRpcResult(completableFuture), TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        CompletableFuture<byte[]> triggerAndCompleteCheckpoint = triggerAndCompleteCheckpoint(createCoordinatorHolder, 22L);
        Assertions.assertThat(triggerAndCompleteCheckpoint).isNotDone();
        completableFuture.complete(Acknowledge.get());
        Assertions.assertThat(triggerAndCompleteCheckpoint).isDone();
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
        checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
        checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
    }

    private void checkpointEventValueAtomicity(Function<OperatorCoordinator.Context, OperatorCoordinator> function) throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ComponentMainThreadExecutor componentMainThreadExecutorServiceAdapter = new ComponentMainThreadExecutorServiceAdapter((ScheduledExecutorService) manuallyTriggeredScheduledExecutorService, Thread.currentThread());
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, function, componentMainThreadExecutorServiceAdapter);
        Thread.sleep(new Random().nextInt(10));
        manuallyTriggeredScheduledExecutorService.triggerAll();
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(0L, completableFuture);
        manuallyTriggeredScheduledExecutorService.triggerAll();
        Thread.sleep(new Random().nextInt(10));
        createCoordinatorHolder.close();
        manuallyTriggeredScheduledExecutorService.triggerAll();
        Assertions.assertThat(completableFuture).isDone();
        int bytesToInt = bytesToInt((byte[]) completableFuture.get());
        Assertions.assertThat(createForRunningTasks.getNumberOfSentEvents()).isEqualTo(bytesToInt);
        for (int i = 0; i < bytesToInt; i++) {
            Assertions.assertThat(((TestOperatorEvent) createForRunningTasks.getAllSentEvents().get(i).event).getValue()).isEqualTo(i);
        }
    }

    @Test
    public void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasksWithRpcResult(completableFuture), TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        CompletableFuture completableFuture2 = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(1L, completableFuture2);
        getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint().complete(new byte[0]);
        completableFuture.completeExceptionally(new RuntimeException("Artificial"));
        Assertions.assertThat(completableFuture2).isCompletedExceptionally();
    }

    @Test
    public void testCheckpointFailsIfSendingEventFailedBeforeTrigger() throws Exception {
        ReorderableManualExecutorService reorderableManualExecutorService = new ReorderableManualExecutorService();
        ComponentMainThreadExecutorServiceAdapter componentMainThreadExecutorServiceAdapter = new ComponentMainThreadExecutorServiceAdapter((ScheduledExecutorService) reorderableManualExecutorService, Thread.currentThread());
        CompletableFuture completableFuture = new CompletableFuture();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(EventReceivingTasks.createForRunningTasksWithRpcResult(completableFuture), TestingOperatorCoordinator::new, componentMainThreadExecutorServiceAdapter);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        reorderableManualExecutorService.triggerAll();
        reorderableManualExecutorService.setDelayNewRunnables(true);
        completableFuture.completeExceptionally(new RuntimeException("Artificial"));
        reorderableManualExecutorService.setDelayNewRunnables(false);
        CompletableFuture completableFuture2 = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(1L, completableFuture2);
        reorderableManualExecutorService.triggerAll();
        getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint().complete(new byte[0]);
        reorderableManualExecutorService.triggerAll();
        Assertions.assertThat(completableFuture2).isNotDone();
        reorderableManualExecutorService.executeAllDelayedRunnables();
        reorderableManualExecutorService.triggerAll();
        Assertions.assertThat(completableFuture2).isCompletedExceptionally();
    }

    @Test
    public void testControlGatewayAtSubtaskGranularity() throws Exception {
        EventReceivingTasks createForRunningTasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(createForRunningTasks, TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(1L, new CompletableFuture());
        getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint().complete(new byte[0]);
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(0).sendEvent(new TestOperatorEvent(0));
        getCoordinator(createCoordinatorHolder).getSubtaskGateway(1).sendEvent(new TestOperatorEvent(1));
        createCoordinatorHolder.handleEventFromOperator(1, 0, new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).isEmpty();
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(1)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1)});
        createCoordinatorHolder.handleEventFromOperator(0, 0, new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(0)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(0)});
        Assertions.assertThat(createForRunningTasks.getSentEventsForSubtask(1)).containsExactly(new OperatorEvent[]{new TestOperatorEvent(1)});
    }

    private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(OperatorCoordinatorHolder operatorCoordinatorHolder, long j) throws Exception {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        operatorCoordinatorHolder.checkpointCoordinator(j, completableFuture);
        getCoordinator(operatorCoordinatorHolder).getLastTriggeredCheckpoint().complete(new byte[0]);
        return completableFuture;
    }

    static byte[] intToBytes(int i) {
        return ByteBuffer.allocate(4).putInt(i).array();
    }

    static int bytesToInt(byte[] bArr) {
        return ByteBuffer.wrap(bArr).getInt();
    }

    private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder operatorCoordinatorHolder) {
        return (TestingOperatorCoordinator) operatorCoordinatorHolder.coordinator();
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory subtaskAccessFactory, Function<OperatorCoordinator.Context, OperatorCoordinator> function) throws Exception {
        return createCoordinatorHolder(subtaskAccessFactory, function, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory subtaskAccessFactory, final Function<OperatorCoordinator.Context, OperatorCoordinator> function, ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        final OperatorID operatorID = new OperatorID();
        OperatorCoordinatorHolder create = OperatorCoordinatorHolder.create(operatorID, new OperatorCoordinator.Provider() { // from class: org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.2
            public OperatorID getOperatorId() {
                return operatorID;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return (OperatorCoordinator) function.apply(context);
            }
        }, new CoordinatorStoreImpl(), "test-coordinator-name", getClass().getClassLoader(), 3, 1775, subtaskAccessFactory, false);
        create.lazyInitialize(this.globalFailureHandler, componentMainThreadExecutor);
        create.start();
        return create;
    }
}
