package org.apache.flink.runtime.io.network.partition.queue;

import com.google.common.base.Optional;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.MockConsumer;
import org.apache.flink.runtime.io.network.partition.MockNotificationListener;
import org.apache.flink.runtime.io.network.partition.MockProducer;
import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.class */
public class PipelinedPartitionQueueTest {
    private static final int NUM_BUFFERS = 1024;
    private static final int BUFFER_SIZE = 32768;
    private static final NetworkBufferPool networkBuffers = new NetworkBufferPool(NUM_BUFFERS, BUFFER_SIZE);
    private PipelinedPartitionQueue queue;

    @Before
    public void setup() {
        this.queue = new PipelinedPartitionQueue();
    }

    @Test(expected = IllegalQueueIteratorRequestException.class)
    public void testExceptionWhenMultipleConsumers() throws IOException {
        this.queue.getQueueIterator(Optional.absent());
        this.queue.getQueueIterator(Optional.absent());
    }

    @Test(expected = IntermediateResultPartitionQueueIterator.AlreadySubscribedException.class)
    public void testExceptionWhenMultipleSubscribers() throws IOException {
        IntermediateResultPartitionQueueIterator queueIterator = this.queue.getQueueIterator(Optional.absent());
        NotificationListener notificationListener = (NotificationListener) Mockito.mock(NotificationListener.class);
        Assert.assertTrue(queueIterator.subscribe(notificationListener));
        queueIterator.subscribe(notificationListener);
    }

    @Test
    public void testProduceConsume() throws Exception {
        Buffer buffer = (Buffer) Mockito.mock(Buffer.class);
        MockNotificationListener mockNotificationListener = new MockNotificationListener();
        IntermediateResultPartitionQueueIterator queueIterator = this.queue.getQueueIterator(Optional.absent());
        Assert.assertNull(queueIterator.getNextBuffer());
        Assert.assertFalse(queueIterator.isConsumed());
        Assert.assertTrue(queueIterator.subscribe(mockNotificationListener));
        Assert.assertEquals(0L, mockNotificationListener.getNumberOfNotifications());
        this.queue.add(buffer);
        Assert.assertEquals(1L, mockNotificationListener.getNumberOfNotifications());
        Assert.assertNotNull(queueIterator.getNextBuffer());
        Assert.assertNull(queueIterator.getNextBuffer());
        Assert.assertFalse(queueIterator.isConsumed());
        this.queue.add(buffer);
        Assert.assertFalse(queueIterator.subscribe(mockNotificationListener));
        Assert.assertEquals(1L, mockNotificationListener.getNumberOfNotifications());
    }

    @Test
    public void testDiscardingProduceWhileSubscribedConsumer() throws IOException {
        IntermediateResultPartitionQueueIterator queueIterator = this.queue.getQueueIterator(Optional.absent());
        NotificationListener notificationListener = (NotificationListener) Mockito.mock(NotificationListener.class);
        Assert.assertTrue(queueIterator.subscribe(notificationListener));
        this.queue.discard();
        ((NotificationListener) Mockito.verify(notificationListener, Mockito.times(1))).onNotification();
        Assert.assertTrue(queueIterator.isConsumed());
    }

    @Test
    public void testConcurrentProduceConsume() throws Exception {
        doTestConcurrentProduceConsume(false, false);
    }

    @Test
    public void testConcurrentSlowProduceConsume() throws Exception {
        doTestConcurrentProduceConsume(true, false);
    }

    @Test
    public void testConcurrentProduceSlowConsume() throws Exception {
        doTestConcurrentProduceConsume(true, false);
    }

    @Test
    public void testConcurrentDiscardingProduceConsume() throws Exception {
        doTestConcurrentProduceConsume(false, false, true);
    }

    @Test
    public void testConcurrentDiscardingSlowProduceConsume() throws Exception {
        doTestConcurrentProduceConsume(true, false, true);
    }

    @Test
    public void testConcurrentDiscardingProduceSlowConsume() throws Exception {
        doTestConcurrentProduceConsume(false, true, true);
    }

    private void doTestConcurrentProduceConsume(boolean z, boolean z2) throws Exception {
        doTestConcurrentProduceConsume(z, z2, false);
    }

    private void doTestConcurrentProduceConsume(boolean z, boolean z2, boolean z3) throws Exception {
        BufferPool createBufferPool = networkBuffers.createBufferPool(8, true);
        MockProducer mockProducer = new MockProducer(this.queue, createBufferPool, 64, z);
        if (z3) {
            mockProducer.discardAfter(new Random().nextInt(64));
        }
        MockConsumer mockConsumer = new MockConsumer(this.queue.getQueueIterator(Optional.absent()), z2);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        try {
            boolean z4 = false;
            try {
                z4 = ((Boolean) newCachedThreadPool.submit(mockProducer).get(5L, TimeUnit.SECONDS)).booleanValue() & ((Boolean) newCachedThreadPool.submit(mockConsumer).get(5L, TimeUnit.SECONDS)).booleanValue();
            } catch (Throwable th) {
                th.printStackTrace();
                if (mockProducer.getError() != null) {
                    System.err.println("Producer error:");
                    mockProducer.getError().printStackTrace();
                }
                if (mockConsumer.getError() != null) {
                    System.err.println("Consumer error:");
                    mockConsumer.getError().printStackTrace();
                }
                Assert.fail("Unexpected failure during test: " + th.getMessage() + ". Producer error: " + mockProducer.getError() + ", consumer error: " + mockConsumer.getError());
            }
            createBufferPool.destroy();
            Assert.assertTrue(z4);
            newCachedThreadPool.shutdownNow();
        } catch (Throwable th2) {
            newCachedThreadPool.shutdownNow();
            throw th2;
        }
    }
}
