package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImplTest.class */
public class TaskMailboxImplTest {
    private static final RunnableWithException NO_OP = () -> {
    };
    private static final int DEFAULT_PRIORITY = 0;
    private TaskMailbox taskMailbox;

    @Before
    public void setUp() {
        this.taskMailbox = new TaskMailboxImpl();
    }

    @After
    public void tearDown() {
        this.taskMailbox.close();
    }

    @Test
    public void testPutAsHead() throws InterruptedException {
        Mail mail = new Mail(() -> {
        }, Integer.MAX_VALUE, "mailA", new Object[0]);
        Mail mail2 = new Mail(() -> {
        }, Integer.MAX_VALUE, "mailB", new Object[0]);
        Mail mail3 = new Mail(() -> {
        }, 0, "mailC, DEFAULT_PRIORITY", new Object[0]);
        Mail mail4 = new Mail(() -> {
        }, 0, "mailD, DEFAULT_PRIORITY", new Object[0]);
        this.taskMailbox.put(mail3);
        this.taskMailbox.putFirst(mail2);
        this.taskMailbox.put(mail4);
        this.taskMailbox.putFirst(mail);
        Assert.assertSame(mail, this.taskMailbox.take(0));
        Assert.assertSame(mail2, this.taskMailbox.take(0));
        Assert.assertSame(mail3, this.taskMailbox.take(0));
        Assert.assertSame(mail4, this.taskMailbox.take(0));
        Assert.assertFalse(this.taskMailbox.tryTake(0).isPresent());
    }

    @Test
    public void testContracts() throws InterruptedException {
        LinkedList linkedList = new LinkedList();
        Assert.assertFalse(this.taskMailbox.hasMail());
        for (int i = 0; i < 10; i++) {
            Mail mail = new Mail(NO_OP, 0, "mail, DEFAULT_PRIORITY", new Object[0]);
            linkedList.add(mail);
            this.taskMailbox.put(mail);
            Assert.assertTrue(this.taskMailbox.hasMail());
        }
        while (!linkedList.isEmpty()) {
            Assert.assertEquals(linkedList.remove(), this.taskMailbox.take(0));
            Assert.assertEquals(Boolean.valueOf(!linkedList.isEmpty()), Boolean.valueOf(this.taskMailbox.hasMail()));
        }
    }

    @Test
    public void testConcurrentPutTakeBlocking() throws Exception {
        testPutTake(taskMailbox -> {
            return taskMailbox.take(0);
        });
    }

    @Test
    public void testConcurrentPutTakeNonBlockingAndWait() throws Exception {
        testPutTake(taskMailbox -> {
            Optional tryTake = taskMailbox.tryTake(0);
            while (true) {
                Optional optional = tryTake;
                if (optional.isPresent()) {
                    return (Mail) optional.get();
                }
                tryTake = taskMailbox.tryTake(0);
            }
        });
    }

    @Test
    public void testCloseUnblocks() throws InterruptedException {
        testAllPuttingUnblocksInternal((v0) -> {
            v0.close();
        });
    }

    @Test
    public void testQuiesceUnblocks() throws InterruptedException {
        testAllPuttingUnblocksInternal((v0) -> {
            v0.quiesce();
        });
    }

    @Test
    public void testLifeCycleQuiesce() throws InterruptedException {
        this.taskMailbox.put(new Mail(NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.put(new Mail(NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        this.taskMailbox.quiesce();
        testLifecyclePuttingInternal();
        this.taskMailbox.take(0);
        Assert.assertTrue(this.taskMailbox.tryTake(0).isPresent());
        Assert.assertFalse(this.taskMailbox.tryTake(0).isPresent());
    }

    @Test
    public void testLifeCycleClose() throws InterruptedException {
        this.taskMailbox.close();
        testLifecyclePuttingInternal();
        try {
            this.taskMailbox.take(0);
            Assert.fail();
        } catch (TaskMailbox.MailboxClosedException e) {
        }
        try {
            this.taskMailbox.tryTake(0);
            Assert.fail();
        } catch (TaskMailbox.MailboxClosedException e2) {
        }
    }

    private void testLifecyclePuttingInternal() {
        try {
            this.taskMailbox.put(new Mail(NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
            Assert.fail();
        } catch (TaskMailbox.MailboxClosedException e) {
        }
        try {
            this.taskMailbox.putFirst(new Mail(NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0]));
            Assert.fail();
        } catch (TaskMailbox.MailboxClosedException e2) {
        }
    }

    private void testAllPuttingUnblocksInternal(Consumer<TaskMailbox> consumer) throws InterruptedException {
        testUnblocksInternal(() -> {
            this.taskMailbox.put(new Mail(NO_OP, 0, "NO_OP, DEFAULT_PRIORITY", new Object[0]));
        }, consumer);
        setUp();
        testUnblocksInternal(() -> {
            this.taskMailbox.putFirst(new Mail(NO_OP, Integer.MAX_VALUE, "NO_OP", new Object[0]));
        }, consumer);
    }

    private void testUnblocksInternal(RunnableWithException runnableWithException, Consumer<TaskMailbox> consumer) throws InterruptedException {
        Thread[] threadArr = new Thread[8];
        Exception[] excArr = new Exception[threadArr.length];
        CountDownLatch countDownLatch = new CountDownLatch(threadArr.length);
        for (int i = 0; i < threadArr.length; i++) {
            int i2 = i;
            Thread thread = new Thread(() -> {
                try {
                    countDownLatch.countDown();
                    while (true) {
                        runnableWithException.run();
                    }
                } catch (Exception e) {
                    excArr[i2] = e;
                }
            });
            threadArr[i] = thread;
            thread.start();
        }
        countDownLatch.await();
        consumer.accept(this.taskMailbox);
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        for (Exception exc : excArr) {
            Assert.assertEquals(TaskMailbox.MailboxClosedException.class, exc.getClass());
        }
    }

    private void testPutTake(FunctionWithException<TaskMailbox, Mail, InterruptedException> functionWithException) throws Exception {
        int[] iArr = new int[10];
        Thread[] threadArr = new Thread[10];
        for (int i = 0; i < threadArr.length; i++) {
            int i2 = i;
            threadArr[i] = new Thread(ThrowingRunnable.unchecked(() -> {
                for (int i3 = 0; i3 < 1000; i3++) {
                    this.taskMailbox.put(new Mail(() -> {
                        iArr[i2] = iArr[i2] + 1;
                    }, 0, "result " + i3, new Object[0]));
                }
            }));
        }
        for (Thread thread : threadArr) {
            thread.start();
        }
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.taskMailbox.put(new Mail(() -> {
            atomicBoolean.set(false);
        }, 0, "POISON_MAIL, DEFAULT_PRIORITY", new Object[0]));
        while (atomicBoolean.get()) {
            ((Mail) functionWithException.apply(this.taskMailbox)).run();
        }
        for (int i3 : iArr) {
            Assert.assertEquals(1000L, i3);
        }
    }

    @Test
    public void testPutAsHeadWithPriority() throws InterruptedException {
        Mail mail = new Mail(() -> {
        }, 2, "mailA", new Object[0]);
        Mail mail2 = new Mail(() -> {
        }, 2, "mailB", new Object[0]);
        Mail mail3 = new Mail(() -> {
        }, 1, "mailC", new Object[0]);
        Mail mail4 = new Mail(() -> {
        }, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mail3);
        this.taskMailbox.put(mail2);
        this.taskMailbox.put(mail4);
        this.taskMailbox.putFirst(mail);
        Assert.assertSame(mail, this.taskMailbox.take(2));
        Assert.assertSame(mail2, this.taskMailbox.take(2));
        Assert.assertFalse(this.taskMailbox.tryTake(2).isPresent());
        Assert.assertSame(mail3, this.taskMailbox.take(1));
        Assert.assertSame(mail4, this.taskMailbox.take(1));
        Assert.assertFalse(this.taskMailbox.tryTake(1).isPresent());
    }

    @Test
    public void testPutWithPriorityAndReadingFromMainMailbox() throws InterruptedException {
        Mail mail = new Mail(() -> {
        }, 2, "mailA", new Object[0]);
        Mail mail2 = new Mail(() -> {
        }, 2, "mailB", new Object[0]);
        Mail mail3 = new Mail(() -> {
        }, 1, "mailC", new Object[0]);
        Mail mail4 = new Mail(() -> {
        }, 1, "mailD", new Object[0]);
        this.taskMailbox.put(mail3);
        this.taskMailbox.put(mail2);
        this.taskMailbox.put(mail4);
        this.taskMailbox.putFirst(mail);
        Assert.assertSame(mail, this.taskMailbox.take(-1));
        Assert.assertSame(mail3, this.taskMailbox.take(-1));
        Assert.assertSame(mail2, this.taskMailbox.take(-1));
        Assert.assertSame(mail4, this.taskMailbox.take(-1));
    }

    @Test
    public void testBatchAndNonBatchTake() throws InterruptedException {
        List list = (List) IntStream.range(0, 6).mapToObj(i -> {
            return new Mail(NO_OP, 0, String.valueOf(i), new Object[0]);
        }).collect(Collectors.toList());
        List subList = list.subList(0, 3);
        TaskMailbox taskMailbox = this.taskMailbox;
        taskMailbox.getClass();
        subList.forEach(taskMailbox::put);
        Assert.assertTrue(this.taskMailbox.createBatch());
        List subList2 = list.subList(3, 6);
        TaskMailbox taskMailbox2 = this.taskMailbox;
        taskMailbox2.getClass();
        subList2.forEach(taskMailbox2::put);
        Assert.assertEquals(Optional.ofNullable(list.get(0)), this.taskMailbox.tryTakeFromBatch());
        Assert.assertEquals(Optional.ofNullable(list.get(1)), this.taskMailbox.tryTake(0));
        Assert.assertEquals(list.get(2), this.taskMailbox.take(0));
        Assert.assertEquals(Optional.empty(), this.taskMailbox.tryTakeFromBatch());
        Assert.assertEquals(Optional.ofNullable(list.get(3)), this.taskMailbox.tryTake(0));
        Assert.assertEquals(list.get(4), this.taskMailbox.take(0));
        Assert.assertEquals(Collections.singletonList(list.get(5)), this.taskMailbox.close());
    }

    @Test
    public void testBatchDrain() throws Exception {
        Mail mail = new Mail(() -> {
        }, Integer.MAX_VALUE, "mailA", new Object[0]);
        Mail mail2 = new Mail(() -> {
        }, Integer.MAX_VALUE, "mailB", new Object[0]);
        this.taskMailbox.put(mail);
        Assert.assertTrue(this.taskMailbox.createBatch());
        this.taskMailbox.put(mail2);
        Assert.assertEquals(Arrays.asList(mail, mail2), this.taskMailbox.drain());
    }

    @Test
    public void testBatchPriority() throws Exception {
        Mail mail = new Mail(() -> {
        }, 1, "mailA", new Object[0]);
        Mail mail2 = new Mail(() -> {
        }, 2, "mailB", new Object[0]);
        this.taskMailbox.put(mail);
        Assert.assertTrue(this.taskMailbox.createBatch());
        this.taskMailbox.put(mail2);
        Assert.assertEquals(mail2, this.taskMailbox.take(2));
        Assert.assertEquals(Optional.of(mail), this.taskMailbox.tryTakeFromBatch());
    }

    @Test
    public void testRunExclusively() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            this.taskMailbox.runExclusively(() -> {
                countDownLatch.countDown();
                for (int i = 0; i < 10; i++) {
                    try {
                        this.taskMailbox.put(new Mail(() -> {
                        }, 1, "mailD", new Object[0]));
                        Thread.sleep(1L);
                    } catch (Exception e) {
                    }
                }
            });
        }).start();
        countDownLatch.await();
        Assert.assertEquals(10L, this.taskMailbox.close().size());
    }
}
