package org.apache.flink.runtime.io.network.partition;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.class */
public class PipelinedSubpartitionTest extends SubpartitionTestBase {

    @ClassRule
    public static final TestExecutorResource<ExecutorService> EXECUTOR_RESOURCE = new TestExecutorResource<>(() -> {
        return Executors.newCachedThreadPool();
    });

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest$FailurePipelinedSubpartition.class */
    private static class FailurePipelinedSubpartition extends PipelinedSubpartition {
        FailurePipelinedSubpartition(int i, int i2, ResultPartition resultPartition) {
            super(i, i2, resultPartition);
        }

        Throwable getFailureCause() {
            return new RuntimeException("Expected test exception");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    /* renamed from: createSubpartition, reason: merged with bridge method [inline-methods] */
    public PipelinedSubpartition mo188createSubpartition() throws Exception {
        return createPipelinedSubpartition();
    }

    @Override // org.apache.flink.runtime.io.network.partition.SubpartitionTestBase
    ResultSubpartition createFailingWritesSubpartition() throws Exception {
        Assume.assumeTrue(false);
        return null;
    }

    @Test
    public void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        Assert.assertNotNull(mo188createSubpartition.createReadView(new NoOpBufferAvailablityListener()));
        try {
            mo188createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            Assert.fail("Did not throw expected exception after duplicate notifyNonEmpty view request.");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testIsReleasedChecksParent() {
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition) Mockito.mock(PipelinedSubpartition.class);
        PipelinedSubpartitionView pipelinedSubpartitionView = new PipelinedSubpartitionView(pipelinedSubpartition, (BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assert.assertFalse(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(1))).isReleased();
        Mockito.when(Boolean.valueOf(pipelinedSubpartition.isReleased())).thenReturn(true);
        Assert.assertTrue(pipelinedSubpartitionView.isReleased());
        ((PipelinedSubpartition) Mockito.verify(pipelinedSubpartition, Mockito.times(2))).isReleased();
    }

    @Test
    public void testConcurrentFastProduceAndFastConsume() throws Exception {
        testProduceConsume(false, false);
    }

    @Test
    public void testConcurrentFastProduceAndSlowConsume() throws Exception {
        testProduceConsume(false, true);
    }

    @Test
    public void testConcurrentSlowProduceAndFastConsume() throws Exception {
        testProduceConsume(true, false);
    }

    @Test
    public void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        testProduceConsume(true, true);
    }

    private void testProduceConsume(boolean z, boolean z2) throws Exception {
        TestProducerSource testProducerSource = new TestProducerSource() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.1
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestProducerSource
            public TestProducerSource.BufferAndChannel getNextBuffer() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(32768);
                int i = this.numberOfBuffers * 8192;
                for (int i2 = 0; i2 < 32768; i2 += 4) {
                    allocateUnpooledSegment.putInt(i2, i);
                    i++;
                }
                this.numberOfBuffers++;
                return new TestProducerSource.BufferAndChannel(allocateUnpooledSegment.getArray(), 0);
            }
        };
        TestConsumerCallback testConsumerCallback = new TestConsumerCallback() { // from class: org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest.2
            private int numberOfBuffers;

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onBuffer(Buffer buffer) {
                MemorySegment memorySegment = buffer.getMemorySegment();
                Assert.assertEquals(memorySegment.size(), buffer.getSize());
                int size = this.numberOfBuffers * (memorySegment.size() / 4);
                for (int i = 0; i < memorySegment.size(); i += 4) {
                    Assert.assertEquals(size, memorySegment.getInt(i));
                    size++;
                }
                this.numberOfBuffers++;
                buffer.recycleBuffer();
            }

            @Override // org.apache.flink.runtime.io.network.util.TestConsumerCallback
            public void onEvent(AbstractEvent abstractEvent) {
            }
        };
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        TestSubpartitionProducer testSubpartitionProducer = new TestSubpartitionProducer(mo188createSubpartition, z, testProducerSource);
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(z2, testConsumerCallback);
        testSubpartitionConsumer.setSubpartitionView(mo188createSubpartition.createReadView(testSubpartitionConsumer));
        testSubpartitionProducer.getClass();
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testSubpartitionProducer::call), EXECUTOR_RESOURCE.getExecutor());
        testSubpartitionConsumer.getClass();
        FutureUtils.waitForAll(Arrays.asList(supplyAsync, CompletableFuture.supplyAsync(CheckedSupplier.unchecked(testSubpartitionConsumer::call), EXECUTOR_RESOURCE.getExecutor()))).get(60000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testCleanupReleasedPartitionNoView() throws Exception {
        testCleanupReleasedPartition(false);
    }

    @Test
    public void testCleanupReleasedPartitionWithView() throws Exception {
        testCleanupReleasedPartition(true);
    }

    private void testCleanupReleasedPartition(boolean z) throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        BufferConsumer createFilledFinishedBufferConsumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        BufferConsumer createFilledFinishedBufferConsumer2 = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096);
        try {
            mo188createSubpartition.add(createFilledFinishedBufferConsumer);
            mo188createSubpartition.add(createFilledFinishedBufferConsumer2);
            Assert.assertEquals(2L, mo188createSubpartition.getNumberOfQueuedBuffers());
            PipelinedSubpartitionView pipelinedSubpartitionView = null;
            if (z) {
                pipelinedSubpartitionView = mo188createSubpartition.createReadView(new NoOpBufferAvailablityListener());
            }
            mo188createSubpartition.release();
            Assert.assertEquals(0L, mo188createSubpartition.getNumberOfQueuedBuffers());
            Assert.assertTrue(mo188createSubpartition.isReleased());
            if (z) {
                Assert.assertTrue(pipelinedSubpartitionView.isReleased());
            }
            Assert.assertTrue(createFilledFinishedBufferConsumer.isRecycled());
            boolean isRecycled = createFilledFinishedBufferConsumer.isRecycled();
            if (!isRecycled) {
                createFilledFinishedBufferConsumer.close();
            }
            boolean isRecycled2 = createFilledFinishedBufferConsumer2.isRecycled();
            if (!isRecycled2) {
                createFilledFinishedBufferConsumer2.close();
            }
            if (!isRecycled) {
                Assert.fail("buffer 1 not recycled");
            }
            if (!isRecycled2) {
                Assert.fail("buffer 2 not recycled");
            }
            Assert.assertEquals(2L, mo188createSubpartition.getTotalNumberOfBuffersUnsafe());
            Assert.assertEquals(0L, mo188createSubpartition.getTotalNumberOfBytesUnsafe());
        } catch (Throwable th) {
            if (!createFilledFinishedBufferConsumer.isRecycled()) {
                createFilledFinishedBufferConsumer.close();
            }
            if (!createFilledFinishedBufferConsumer2.isRecycled()) {
                createFilledFinishedBufferConsumer2.close();
            }
            throw th;
        }
    }

    @Test
    public void testReleaseParent() throws Exception {
        verifyViewReleasedAfterParentRelease(mo188createSubpartition());
    }

    @Test
    public void testNumberOfQueueBuffers() throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assert.assertEquals(1L, mo188createSubpartition.getNumberOfQueuedBuffers());
        mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4096));
        Assert.assertEquals(2L, mo188createSubpartition.getNumberOfQueuedBuffers());
        mo188createSubpartition.getNextBuffer();
        Assert.assertEquals(1L, mo188createSubpartition.getNumberOfQueuedBuffers());
    }

    @Test
    public void testNewBufferSize() throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        Assert.assertEquals(2147483647L, mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
        mo188createSubpartition.bufferSize(42);
        Assert.assertEquals(42L, mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
    }

    @Test(expected = IllegalArgumentException.class)
    public void testNegativeNewBufferSize() throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        Assert.assertEquals(2147483647L, mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
        mo188createSubpartition.bufferSize(-1);
    }

    @Test
    public void testNegativeBufferSizeAsSignOfAddingFail() throws Exception {
        PipelinedSubpartition mo188createSubpartition = mo188createSubpartition();
        Assert.assertEquals(2147483647L, mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
        mo188createSubpartition.finish();
        Assert.assertEquals(-1L, mo188createSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(4)));
    }

    @Test
    public void testProducerFailedException() {
        PipelinedSubpartitionView createReadView = new FailurePipelinedSubpartition(0, 2, PartitionTestUtils.createPartition()).createReadView(new NoOpBufferAvailablityListener());
        Assert.assertNotNull(createReadView.getFailureCause());
        Assert.assertTrue(createReadView.getFailureCause() instanceof CancelTaskException);
    }

    private void verifyViewReleasedAfterParentRelease(ResultSubpartition resultSubpartition) throws Exception {
        resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        resultSubpartition.finish();
        ResultSubpartitionView createReadView = resultSubpartition.createReadView((BufferAvailabilityListener) Mockito.mock(BufferAvailabilityListener.class));
        Assert.assertNotNull(createReadView.getNextBuffer());
        Assert.assertNotNull(createReadView.getNextBuffer());
        Assert.assertFalse(createReadView.isReleased());
        resultSubpartition.release();
        Assert.assertTrue(createReadView.isReleased());
    }

    public static PipelinedSubpartition createPipelinedSubpartition() {
        return new PipelinedSubpartition(0, 2, PartitionTestUtils.createPartition());
    }

    public static PipelinedSubpartition createPipelinedSubpartition(ResultPartition resultPartition) {
        return new PipelinedSubpartition(0, 2, resultPartition);
    }
}
