package org.apache.flink.runtime.iterative.concurrent;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.class */
public class BlockingBackChannelTest {
    private static final int NUM_ITERATIONS = 3;
    private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1;

    /* loaded from: input_file:org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest$IterationHead.class */
    class IterationHead implements Runnable {
        private final BlockingBackChannel backChannel;
        private final BlockingQueue<Integer> dataChannel;
        private final Random random = new Random();
        private final List<String> actionLog;

        IterationHead(BlockingBackChannel blockingBackChannel, BlockingQueue<Integer> blockingQueue, List<String> list) {
            this.backChannel = blockingBackChannel;
            this.dataChannel = blockingQueue;
            this.actionLog = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            processInputAndSendMessageThroughDataChannel();
            for (int i = 0; i < BlockingBackChannelTest.NUM_ITERATIONS; i++) {
                try {
                    this.backChannel.getReadEndAfterSuperstepEnded();
                    this.actionLog.add("head reads in iteration " + i);
                    Thread.sleep(this.random.nextInt(100));
                    if (i != 2) {
                        processInputAndSendMessageThroughDataChannel();
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        void processInputAndSendMessageThroughDataChannel() {
            this.actionLog.add("head sends data");
            this.dataChannel.offer(BlockingBackChannelTest.INPUT_COMPLETELY_PROCESSED_MESSAGE);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest$IterationTail.class */
    class IterationTail implements Runnable {
        private final BlockingBackChannel backChannel;
        private final BlockingQueue<Integer> dataChannel;
        private final Random random = new Random();
        private final List<String> actionLog;

        IterationTail(BlockingBackChannel blockingBackChannel, BlockingQueue<Integer> blockingQueue, List<String> list) {
            this.backChannel = blockingBackChannel;
            this.dataChannel = blockingQueue;
            this.actionLog = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < BlockingBackChannelTest.NUM_ITERATIONS; i++) {
                try {
                    DataOutputView writeEnd = this.backChannel.getWriteEnd();
                    readInputFromDataChannel();
                    Thread.sleep(this.random.nextInt(10));
                    DataInputView dataInputView = (DataInputView) Mockito.mock(DataInputView.class);
                    this.actionLog.add("tail writes in iteration " + i);
                    writeEnd.write(dataInputView, 1);
                    this.backChannel.notifyOfEndOfSuperstep();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        void readInputFromDataChannel() throws InterruptedException {
            this.dataChannel.take();
            this.actionLog.add("tail receives data");
        }
    }

    @Test
    public void multiThreaded() throws InterruptedException {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        ArrayList newArrayList = Lists.newArrayList();
        BlockingBackChannel blockingBackChannel = new BlockingBackChannel((SerializedUpdateBuffer) Mockito.mock(SerializedUpdateBuffer.class));
        Thread thread = new Thread(new IterationHead(blockingBackChannel, arrayBlockingQueue, newArrayList));
        Thread thread2 = new Thread(new IterationTail(blockingBackChannel, arrayBlockingQueue, newArrayList));
        thread2.start();
        thread.start();
        thread.join();
        thread2.join();
        Assert.assertEquals(12L, newArrayList.size());
        Assert.assertEquals("head sends data", newArrayList.get(0));
        Assert.assertEquals("tail receives data", newArrayList.get(1));
        Assert.assertEquals("tail writes in iteration 0", newArrayList.get(2));
        Assert.assertEquals("head reads in iteration 0", newArrayList.get(NUM_ITERATIONS));
        Assert.assertEquals("head sends data", newArrayList.get(4));
        Assert.assertEquals("tail receives data", newArrayList.get(5));
        Assert.assertEquals("tail writes in iteration 1", newArrayList.get(6));
        Assert.assertEquals("head reads in iteration 1", newArrayList.get(7));
        Assert.assertEquals("head sends data", newArrayList.get(8));
        Assert.assertEquals("tail receives data", newArrayList.get(9));
        Assert.assertEquals("tail writes in iteration 2", newArrayList.get(10));
        Assert.assertEquals("head reads in iteration 2", newArrayList.get(11));
    }
}
