package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.class */
public class ChannelStateWriteRequestExecutorImplTest {
    private static final JobID JOB_ID = new JobID();
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private static final int SUBTASK_INDEX = 0;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$TestRequestDispatcher.class */
    private static class TestRequestDispatcher implements ChannelStateWriteRequestDispatcher {
        private boolean isStopped;

        private TestRequestDispatcher() {
        }

        public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
        }

        public void fail(Throwable th) {
            this.isStopped = true;
        }

        public boolean isStopped() {
            return this.isStopped;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$TestWriteRequest.class */
    public static class TestWriteRequest extends ChannelStateWriteRequest {
        private boolean cancelled;

        @Nullable
        private final CompletableFuture<?> readyFuture;

        public TestWriteRequest(JobVertexID jobVertexID, int i) {
            this(jobVertexID, i, null);
        }

        public TestWriteRequest(JobVertexID jobVertexID, int i, @Nullable CompletableFuture<?> completableFuture) {
            super(jobVertexID, i, 0L, "Test");
            this.cancelled = false;
            this.readyFuture = completableFuture;
        }

        public void cancel(Throwable th) {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }

        public CompletableFuture<?> getReadyFuture() {
            return this.readyFuture != null ? this.readyFuture : super.getReadyFuture();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest$WorkerClosingDeque.class */
    public static class WorkerClosingDeque extends ArrayDeque<ChannelStateWriteRequest> {
        private ChannelStateWriteRequestExecutor worker;

        private WorkerClosingDeque() {
        }

        @Override // java.util.ArrayDeque, java.util.AbstractCollection, java.util.Collection, java.util.Deque, java.util.Queue
        public boolean add(@Nonnull ChannelStateWriteRequest channelStateWriteRequest) {
            boolean add = super.add((WorkerClosingDeque) channelStateWriteRequest);
            try {
                this.worker.releaseSubtask(ChannelStateWriteRequestExecutorImplTest.JOB_VERTEX_ID, 0);
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
            }
            return add;
        }

        @Override // java.util.ArrayDeque, java.util.Deque
        public void addFirst(@Nonnull ChannelStateWriteRequest channelStateWriteRequest) {
            super.addFirst((WorkerClosingDeque) channelStateWriteRequest);
            try {
                this.worker.releaseSubtask(ChannelStateWriteRequestExecutorImplTest.JOB_VERTEX_ID, 0);
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
            }
        }

        public void setWorker(ChannelStateWriteRequestExecutor channelStateWriteRequestExecutor) {
            this.worker = channelStateWriteRequestExecutor;
        }
    }

    ChannelStateWriteRequestExecutorImplTest() {
    }

    @Test
    void testCloseAfterSubmit() {
        Assertions.assertThatThrownBy(() -> {
            testCloseAfterSubmit((v0, v1) -> {
                v0.submit(v1);
            });
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testCloseAfterSubmitPriority() {
        Assertions.assertThatThrownBy(() -> {
            testCloseAfterSubmit((v0, v1) -> {
                v0.submitPriority(v1);
            });
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testSubmitFailure() throws Exception {
        testSubmitFailure((v0, v1) -> {
            v0.submit(v1);
        });
    }

    @Test
    void testSubmitPriorityFailure() throws Exception {
        testSubmitFailure((v0, v1) -> {
            v0.submitPriority(v1);
        });
    }

    private void testCloseAfterSubmit(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> biConsumerWithException) throws Exception {
        WorkerClosingDeque workerClosingDeque = new WorkerClosingDeque();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, workerClosingDeque, 5, channelStateWriteRequestExecutor -> {
        });
        workerClosingDeque.setWorker(channelStateWriteRequestExecutorImpl);
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        TestWriteRequest testWriteRequest = new TestWriteRequest(JOB_VERTEX_ID, 0);
        biConsumerWithException.accept(channelStateWriteRequestExecutorImpl, testWriteRequest);
        Assertions.assertThat(workerClosingDeque).isEmpty();
        Assertions.assertThat(testWriteRequest.isCancelled()).isFalse();
    }

    private void testSubmitFailure(BiConsumerWithException<ChannelStateWriteRequestExecutor, ChannelStateWriteRequest, Exception> biConsumerWithException) throws Exception {
        TestWriteRequest testWriteRequest = new TestWriteRequest(JOB_VERTEX_ID, 0);
        ArrayDeque arrayDeque = new ArrayDeque();
        try {
            ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, arrayDeque, 5, channelStateWriteRequestExecutor -> {
            });
            channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
            biConsumerWithException.accept(channelStateWriteRequestExecutorImpl, testWriteRequest);
            Assertions.assertThat(testWriteRequest.cancelled).isTrue();
            Assertions.assertThat(arrayDeque).isEmpty();
            throw new RuntimeException("expected exception not thrown");
        } catch (IllegalStateException e) {
            Assertions.assertThat(testWriteRequest.cancelled).isTrue();
            Assertions.assertThat(arrayDeque).isEmpty();
        } catch (Throwable th) {
            Assertions.assertThat(testWriteRequest.cancelled).isTrue();
            Assertions.assertThat(arrayDeque).isEmpty();
            throw th;
        }
    }

    @Test
    void testCleanup() throws IOException {
        TestWriteRequest testWriteRequest = new TestWriteRequest(JOB_VERTEX_ID, 0);
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(testWriteRequest);
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(testRequestDispatcher, arrayDeque, 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.run();
        Assertions.assertThat(testRequestDispatcher.isStopped()).isTrue();
        Assertions.assertThat(arrayDeque).isEmpty();
        Assertions.assertThat(testWriteRequest.isCancelled()).isTrue();
    }

    @Test
    void testIgnoresInterruptsWhileRunning() throws Exception {
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher();
        ArrayDeque arrayDeque = new ArrayDeque();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(testRequestDispatcher, arrayDeque, 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        try {
            channelStateWriteRequestExecutorImpl.start();
            channelStateWriteRequestExecutorImpl.getThread().interrupt();
            channelStateWriteRequestExecutorImpl.submit(new TestWriteRequest(JOB_VERTEX_ID, 0));
            channelStateWriteRequestExecutorImpl.getThread().interrupt();
            while (!arrayDeque.isEmpty()) {
                Thread.sleep(100L);
            }
        } finally {
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
        }
    }

    @Test
    void testCanBeClosed() throws Exception {
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(new ChannelStateWriteRequestDispatcherImpl(new JobManagerCheckpointStorage(), JOB_ID, new ChannelStateSerializerImpl()), 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        try {
            channelStateWriteRequestExecutorImpl.start();
            channelStateWriteRequestExecutorImpl.submit(new CheckpointStartRequest(JOB_VERTEX_ID, 0, 1L, new ChannelStateWriter.ChannelStateWriteResult(), CheckpointStorageLocationReference.getDefault()));
            channelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequest.write(JOB_VERTEX_ID, 0, 1L, new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
            channelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequest.write(JOB_VERTEX_ID, 0, 1L, new ResultSubpartitionInfo(0, 0), new CompletableFuture()));
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
        } catch (Throwable th) {
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
            throw th;
        }
    }

    @Test
    void testSkipUnreadyDataFuture() throws Exception {
        final int i = 0;
        final int i2 = 1;
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        CompletableFuture completableFuture = new CompletableFuture();
        final int i3 = 3;
        final int i4 = 4;
        final int i5 = 4;
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 1));
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 0));
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 0, completableFuture));
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 0));
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 1));
        linkedList.add(new TestWriteRequest(JOB_VERTEX_ID, 1));
        linkedList2.add(new TestWriteRequest(JOB_VERTEX_ID, 0));
        linkedList2.add(new TestWriteRequest(JOB_VERTEX_ID, 1));
        final CompletableFuture completableFuture2 = new CompletableFuture();
        final CompletableFuture completableFuture3 = new CompletableFuture();
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        final AtomicInteger atomicInteger2 = new AtomicInteger(-1);
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(new TestRequestDispatcher() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.TestRequestDispatcher
            public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
                if (channelStateWriteRequest.getSubtaskIndex() == i) {
                    atomicInteger.incrementAndGet();
                } else {
                    if (channelStateWriteRequest.getSubtaskIndex() != i2) {
                        throw new IllegalStateException(String.format("Unknown subtask index %s.", Integer.valueOf(channelStateWriteRequest.getSubtaskIndex())));
                    }
                    if (atomicInteger2.incrementAndGet() == i3) {
                        completableFuture2.complete(null);
                    }
                }
                if (atomicInteger.get() == i4 && atomicInteger2.get() == i5) {
                    completableFuture3.complete(null);
                }
            }
        }, 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 1);
        try {
            channelStateWriteRequestExecutorImpl.start();
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                channelStateWriteRequestExecutorImpl.submit((ChannelStateWriteRequest) it.next());
            }
            completableFuture2.get();
            Assertions.assertThat(atomicInteger.get()).isOne();
            Assertions.assertThat(atomicInteger2.get()).isEqualTo(3);
            Iterator it2 = linkedList2.iterator();
            while (it2.hasNext()) {
                channelStateWriteRequestExecutorImpl.submit((ChannelStateWriteRequest) it2.next());
            }
            completableFuture.complete(Collections.emptyList());
            completableFuture3.get();
            Assertions.assertThat(atomicInteger.get()).isEqualTo(4);
            Assertions.assertThat(atomicInteger2.get()).isEqualTo(4);
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 1);
        } catch (Throwable th) {
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 1);
            throw th;
        }
    }

    @Test
    void testRecordsException() throws IOException {
        final TestException testException = new TestException();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(new TestRequestDispatcher() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.TestRequestDispatcher
            public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
                throw testException;
            }
        }, new ArrayDeque(Collections.singletonList(new TestWriteRequest(JOB_VERTEX_ID, 0))), 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.run();
        try {
            channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
            Assertions.fail("exception not thrown");
        } catch (IOException e) {
            if (!ExceptionUtils.findThrowable(e, TestException.class).filter(testException2 -> {
                return testException2 == testException;
            }).isPresent()) {
                throw e;
            }
        }
    }

    @Test
    void testSubmitRequestOfUnregisteredSubtask() throws Exception {
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.start();
        channelStateWriteRequestExecutorImpl.submit(new TestWriteRequest(JOB_VERTEX_ID, 0));
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.submit(new TestWriteRequest(new JobVertexID(), 0));
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("is not yet registered.");
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.submitPriority(new TestWriteRequest(new JobVertexID(), 0));
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("is not yet registered.");
    }

    @Test
    void testSubmitPriorityUnreadyRequest() throws Exception {
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, 5, channelStateWriteRequestExecutor -> {
        });
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        channelStateWriteRequestExecutorImpl.start();
        channelStateWriteRequestExecutorImpl.submitPriority(new TestWriteRequest(JOB_VERTEX_ID, 0));
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.submitPriority(new TestWriteRequest(JOB_VERTEX_ID, 0, new CompletableFuture()));
        }).isInstanceOf(IllegalStateException.class).hasMessage("The priority request must be ready.");
    }

    @Test
    void testRegisterSubtaskAfterRegisterCompleted() throws Exception {
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(ChannelStateWriteRequestDispatcher.NO_OP, 5, channelStateWriteRequestExecutor -> {
        });
        for (int i = 0; i < 5; i++) {
            Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isTrue();
            channelStateWriteRequestExecutorImpl.registerSubtask(new JobVertexID(), 0);
        }
        Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isFalse();
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.registerSubtask(new JobVertexID(), 0);
        }).isInstanceOf(IllegalStateException.class).hasMessage("This executor has been registered.");
    }

    @Test
    void testSubmitStartRequestBeforeRegisterCompleted() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher() { // from class: org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImplTest.TestRequestDispatcher
            public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) {
                if (channelStateWriteRequest instanceof CheckpointStartRequest) {
                    completableFuture.complete(null);
                }
            }
        };
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.getClass();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(testRequestDispatcher, 5, (v1) -> {
            r4.complete(v1);
        });
        channelStateWriteRequestExecutorImpl.start();
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isTrue();
        channelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequest.start(JOB_VERTEX_ID, 0, 1L, new ChannelStateWriter.ChannelStateWriteResult(), CheckpointStorageLocationReference.getDefault()));
        completableFuture.get();
        Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isFalse();
        Assertions.assertThat(completableFuture2).isCompletedWithValue(channelStateWriteRequestExecutorImpl);
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.registerSubtask(new JobVertexID(), 0);
        }).isInstanceOf(IllegalStateException.class).hasMessage("This executor has been registered.");
    }

    @Test
    void testReleaseSubtaskBeforeRegisterCompleted() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestRequestDispatcher testRequestDispatcher = new TestRequestDispatcher();
        completableFuture.getClass();
        ChannelStateWriteRequestExecutorImpl channelStateWriteRequestExecutorImpl = new ChannelStateWriteRequestExecutorImpl(testRequestDispatcher, 5, (v1) -> {
            r4.complete(v1);
        });
        channelStateWriteRequestExecutorImpl.start();
        channelStateWriteRequestExecutorImpl.registerSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isTrue();
        channelStateWriteRequestExecutorImpl.releaseSubtask(JOB_VERTEX_ID, 0);
        Assertions.assertThat(channelStateWriteRequestExecutorImpl.isRegistering()).isFalse();
        Assertions.assertThat(completableFuture).isCompletedWithValue(channelStateWriteRequestExecutorImpl);
        Assertions.assertThatThrownBy(() -> {
            channelStateWriteRequestExecutorImpl.registerSubtask(new JobVertexID(), 0);
        }).isInstanceOf(IllegalStateException.class).hasMessage("This executor has been registered.");
    }
}
