package org.apache.flink.streaming.api.operators.async.queue;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.class */
public class StreamElementQueueTest extends TestLogger {
    private static final long timeout = 10000;
    private static ExecutorService executor;
    private final StreamElementQueueType streamElementQueueType;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest$StreamElementQueueType.class */
    public enum StreamElementQueueType {
        OrderedStreamElementQueueType,
        UnorderedStreamElementQueueType
    }

    @BeforeClass
    public static void setup() {
        executor = Executors.newFixedThreadPool(3);
    }

    @AfterClass
    public static void shutdown() {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Parameterized.Parameters
    public static Collection<StreamElementQueueType> streamElementQueueTypes() {
        return Arrays.asList(StreamElementQueueType.OrderedStreamElementQueueType, StreamElementQueueType.UnorderedStreamElementQueueType);
    }

    public StreamElementQueueTest(StreamElementQueueType streamElementQueueType) {
        this.streamElementQueueType = (StreamElementQueueType) Preconditions.checkNotNull(streamElementQueueType);
    }

    public StreamElementQueue createStreamElementQueue(int i, OperatorActions operatorActions) {
        switch (this.streamElementQueueType) {
            case OrderedStreamElementQueueType:
                return new OrderedStreamElementQueue(i, executor, operatorActions);
            case UnorderedStreamElementQueueType:
                return new UnorderedStreamElementQueue(i, executor, operatorActions);
            default:
                throw new IllegalStateException("Unknown stream element queue type: " + this.streamElementQueueType);
        }
    }

    @Test
    public void testPut() throws InterruptedException {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        StreamElementQueue createStreamElementQueue = createStreamElementQueue(2, operatorActions);
        Watermark watermark = new Watermark(0L);
        StreamRecord streamRecord = new StreamRecord(42, 1L);
        Watermark watermark2 = new Watermark(2L);
        StreamElementQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(watermark);
        StreamElementQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(streamRecord);
        createStreamElementQueue.put(watermarkQueueEntry);
        createStreamElementQueue.put(streamRecordQueueEntry);
        Assert.assertEquals(2L, createStreamElementQueue.size());
        Assert.assertFalse(createStreamElementQueue.tryPut(new WatermarkQueueEntry(watermark2)));
        Assert.assertEquals(Arrays.asList(watermarkQueueEntry, streamRecordQueueEntry), createStreamElementQueue.values());
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }

    @Test
    public void testPoll() throws InterruptedException {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        StreamElementQueue createStreamElementQueue = createStreamElementQueue(2, operatorActions);
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(0L));
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(42, 1L));
        createStreamElementQueue.put(watermarkQueueEntry);
        createStreamElementQueue.put(streamRecordQueueEntry);
        Assert.assertEquals(watermarkQueueEntry, createStreamElementQueue.peekBlockingly());
        Assert.assertEquals(2L, createStreamElementQueue.size());
        Assert.assertEquals(watermarkQueueEntry, createStreamElementQueue.poll());
        Assert.assertEquals(1L, createStreamElementQueue.size());
        streamRecordQueueEntry.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry, createStreamElementQueue.poll());
        Assert.assertEquals(0L, createStreamElementQueue.size());
        Assert.assertTrue(createStreamElementQueue.isEmpty());
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }

    @Test
    public void testBlockingPut() throws Exception {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        final StreamElementQueue createStreamElementQueue = createStreamElementQueue(1, operatorActions);
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(42, 0L));
        final StreamRecordQueueEntry streamRecordQueueEntry2 = new StreamRecordQueueEntry(new StreamRecord(43, 1L));
        createStreamElementQueue.put(streamRecordQueueEntry);
        Assert.assertEquals(1L, createStreamElementQueue.size());
        Future supplyAsync = FlinkFuture.supplyAsync(new Callable<Void>() { // from class: org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                createStreamElementQueue.put(streamRecordQueueEntry2);
                return null;
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync.isDone());
        streamRecordQueueEntry.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry, createStreamElementQueue.poll());
        supplyAsync.get();
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }

    @Test
    public void testBlockingPoll() throws Exception {
        OperatorActions operatorActions = (OperatorActions) Mockito.mock(OperatorActions.class);
        final StreamElementQueue createStreamElementQueue = createStreamElementQueue(1, operatorActions);
        WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(1L));
        StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(new StreamRecord(1, 2L));
        Assert.assertTrue(createStreamElementQueue.isEmpty());
        Future supplyAsync = FlinkFuture.supplyAsync(new Callable<AsyncResult>() { // from class: org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public AsyncResult call() throws Exception {
                return createStreamElementQueue.peekBlockingly();
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync.isDone());
        createStreamElementQueue.put(watermarkQueueEntry);
        Assert.assertEquals(watermarkQueueEntry, (AsyncResult) supplyAsync.get());
        Assert.assertEquals(1L, createStreamElementQueue.size());
        Assert.assertEquals(watermarkQueueEntry, createStreamElementQueue.poll());
        Assert.assertTrue(createStreamElementQueue.isEmpty());
        Future supplyAsync2 = FlinkFuture.supplyAsync(new Callable<AsyncResult>() { // from class: org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public AsyncResult call() throws Exception {
                return createStreamElementQueue.poll();
            }
        }, executor);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync2.isDone());
        createStreamElementQueue.put(streamRecordQueueEntry);
        Thread.sleep(10L);
        Assert.assertFalse(supplyAsync2.isDone());
        streamRecordQueueEntry.collect(Collections.emptyList());
        Assert.assertEquals(streamRecordQueueEntry, supplyAsync2.get());
        Assert.assertTrue(createStreamElementQueue.isEmpty());
        ((OperatorActions) Mockito.verify(operatorActions, Mockito.never())).failOperator((Throwable) Matchers.any(Exception.class));
    }
}
