package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBufferTest;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.class */
public class SortMergeResultPartitionTest extends TestLogger {
    private static final int bufferSize = 1024;
    private static final int totalBuffers = 1000;
    private static final int totalBytes = 33554432;
    private static final int numThreads = 4;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ExecutorService readIOExecutor;
    private final TestBufferAvailabilityListener listener = new TestBufferAvailabilityListener();

    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    @Rule
    public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest$TestBufferAvailabilityListener.class */
    public static final class TestBufferAvailabilityListener implements BufferAvailabilityListener {
        private int numNotifications;

        private TestBufferAvailabilityListener() {
        }

        public synchronized void notifyDataAvailable() {
            if (this.numNotifications == 0) {
                notifyAll();
            }
            this.numNotifications++;
        }

        public synchronized void waitForData() throws InterruptedException {
            if (this.numNotifications == 0) {
                wait();
            }
            this.numNotifications = 0;
        }
    }

    @Before
    public void setUp() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tmpFolder.getRoot().getPath()}, "testing");
        this.globalPool = new NetworkBufferPool(totalBuffers, bufferSize);
        this.readBufferPool = new BatchShuffleReadBufferPool(33554432L, bufferSize);
        this.readIOExecutor = Executors.newFixedThreadPool(4);
    }

    @After
    public void shutdown() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    public void testWriteAndRead() throws Exception {
        Random random = new Random();
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(100, 100));
        Queue<PartitionSortedBufferTest.DataAndType>[] queueArr = new Queue[10];
        Queue[] queueArr2 = new Queue[10];
        for (int i = 0; i < 10; i++) {
            queueArr[i] = new ArrayDeque();
            queueArr2[i] = new ArrayDeque();
        }
        int[] iArr = new int[10];
        int[] iArr2 = new int[10];
        Arrays.fill(iArr, 0);
        Arrays.fill(iArr2, 0);
        for (int i2 = 0; i2 < totalBuffers; i2++) {
            ByteBuffer generateRandomData = generateRandomData(random.nextInt(2048) + 1, random);
            if (random.nextBoolean()) {
                createSortMergedPartition.broadcastRecord(generateRandomData);
                for (int i3 = 0; i3 < 10; i3++) {
                    recordDataWritten(generateRandomData, queueArr, i3, iArr, Buffer.DataType.DATA_BUFFER);
                }
            } else {
                int nextInt = random.nextInt(10);
                createSortMergedPartition.emitRecord(generateRandomData, nextInt);
                recordDataWritten(generateRandomData, queueArr, nextInt, iArr, Buffer.DataType.DATA_BUFFER);
            }
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        for (int i4 = 0; i4 < 10; i4++) {
            recordDataWritten(EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE), queueArr, i4, iArr, Buffer.DataType.EVENT_BUFFER);
        }
        readData(createSubpartitionViews(createSortMergedPartition, 10), bufferWithChannel -> {
            Buffer buffer = bufferWithChannel.getBuffer();
            int channelIndex = bufferWithChannel.getChannelIndex();
            int readableBytes = buffer.readableBytes();
            iArr2[channelIndex] = iArr2[channelIndex] + readableBytes;
            MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(readableBytes);
            allocateUnpooledSegment.put(0, buffer.getNioBufferReadable(), readableBytes);
            queueArr2[channelIndex].add(new NetworkBuffer(allocateUnpooledSegment, memorySegment -> {
            }, buffer.getDataType(), readableBytes));
        });
        PartitionSortedBufferTest.checkWriteReadResult(10, iArr, iArr2, queueArr, queueArr2);
    }

    private void recordDataWritten(ByteBuffer byteBuffer, Queue<PartitionSortedBufferTest.DataAndType>[] queueArr, int i, int[] iArr, Buffer.DataType dataType) {
        byteBuffer.rewind();
        queueArr[i].add(new PartitionSortedBufferTest.DataAndType(byteBuffer, dataType));
        iArr[i] = iArr[i] + byteBuffer.remaining();
    }

    private ByteBuffer generateRandomData(int i, Random random) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return ByteBuffer.wrap(bArr);
    }

    private long readData(ResultSubpartitionView[] resultSubpartitionViewArr, Consumer<BufferWithChannel> consumer) throws Exception {
        int i = 0;
        int i2 = 0;
        while (i2 < resultSubpartitionViewArr.length) {
            this.listener.waitForData();
            for (int i3 = 0; i3 < resultSubpartitionViewArr.length; i3++) {
                ResultSubpartitionView resultSubpartitionView = resultSubpartitionViewArr[i3];
                ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
                while (true) {
                    ResultSubpartition.BufferAndBacklog bufferAndBacklog = nextBuffer;
                    if (bufferAndBacklog != null) {
                        Buffer buffer = bufferAndBacklog.buffer();
                        consumer.accept(new BufferWithChannel(buffer, i3));
                        i += buffer.readableBytes();
                        buffer.recycleBuffer();
                        if (!buffer.isBuffer()) {
                            i2++;
                            Assert.assertFalse(resultSubpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
                            resultSubpartitionView.releaseAllResources();
                        }
                        nextBuffer = resultSubpartitionView.getNextBuffer();
                    }
                }
            }
        }
        return i;
    }

    private ResultSubpartitionView[] createSubpartitionViews(SortMergeResultPartition sortMergeResultPartition, int i) throws Exception {
        ResultSubpartitionView[] resultSubpartitionViewArr = new ResultSubpartitionView[i];
        for (int i2 = 0; i2 < i; i2++) {
            resultSubpartitionViewArr[i2] = sortMergeResultPartition.createSubpartitionView(i2, this.listener);
        }
        return resultSubpartitionViewArr;
    }

    @Test
    public void testWriteLargeRecord() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(100, 100));
        ByteBuffer generateRandomData = generateRandomData(bufferSize * 100, new Random());
        createSortMergedPartition.emitRecord(generateRandomData, 0);
        Assert.assertEquals(100 / 2, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, this.listener);
        ByteBuffer allocate = ByteBuffer.allocate(bufferSize * 100);
        readData(new ResultSubpartitionView[]{createSubpartitionView}, bufferWithChannel -> {
            Buffer buffer = bufferWithChannel.getBuffer();
            if (buffer.isBuffer()) {
                allocate.put(buffer.getNioBufferReadable());
            }
        });
        generateRandomData.rewind();
        allocate.flip();
        Assert.assertEquals(generateRandomData, allocate);
    }

    @Test
    public void testDataBroadcast() throws Exception {
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(100, 100));
        for (int i = 0; i < 10000; i++) {
            createSortMergedPartition.broadcastRecord(generateRandomData(bufferSize, new Random()));
        }
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        long remaining = (10 * 10000 * bufferSize) + (10 * EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining());
        Assert.assertNotNull(createSortMergedPartition.getResultFile());
        Assert.assertEquals(2L, ((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length);
        for (File file : (File[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].listFiles())) {
            if (file.getName().endsWith(".shuffle.data")) {
                Assert.assertTrue(file.length() < ((long) ((10 * 10000) * bufferSize)));
            }
        }
        Assert.assertEquals(remaining, readData(createSubpartitionViews(createSortMergedPartition, 10), bufferWithChannel -> {
        }));
    }

    @Test
    public void testFlush() throws Exception {
        int i = 10 / 2;
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        Assert.assertEquals(i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 1);
        Assert.assertEquals(3 + i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.flush(0);
        Assert.assertEquals(i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 3);
        Assert.assertEquals(3 + i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.flushAll();
        Assert.assertEquals(i, r0.bestEffortGetNumOfUsedBuffers());
        Assert.assertNull(createSortMergedPartition.getResultFile());
        createSortMergedPartition.finish();
        Assert.assertEquals(3L, createSortMergedPartition.getResultFile().getNumRegions());
        createSortMergedPartition.close();
    }

    @Test(expected = IllegalStateException.class)
    public void testReleaseWhileWriting() throws Exception {
        int i = 10 / 2;
        int i2 = 10 - i;
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        Assert.assertEquals(i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (i2 - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (i2 - 1)), 1);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize), 2);
        Assert.assertNull(createSortMergedPartition.getResultFile());
        Assert.assertEquals(2L, this.fileChannelManager.getPaths()[0].list().length);
        createSortMergedPartition.release();
        try {
            createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * 10), 2);
        } catch (IllegalStateException e) {
            Assert.assertEquals(0L, this.fileChannelManager.getPaths()[0].list().length);
            throw e;
        }
    }

    @Test
    public void testRelease() throws Exception {
        int i = 10 / 2;
        int i2 = 10 - i;
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, this.globalPool.createBufferPool(10, 10));
        Assert.assertEquals(i, r0.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (i2 - 1)), 0);
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (i2 - 1)), 1);
        createSortMergedPartition.finish();
        createSortMergedPartition.close();
        Assert.assertEquals(3L, createSortMergedPartition.getResultFile().getNumRegions());
        Assert.assertEquals(2L, ((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length);
        ResultSubpartitionView createSubpartitionView = createSortMergedPartition.createSubpartitionView(0, this.listener);
        createSortMergedPartition.release();
        while (!createSubpartitionView.isReleased()) {
            ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionView.getNextBuffer();
            if (nextBuffer != null) {
                nextBuffer.buffer().recycleBuffer();
            }
        }
        while (createSortMergedPartition.getResultFile() != null) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(0L, ((String[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].list())).length);
    }

    @Test
    public void testCloseReleasesAllBuffers() throws Exception {
        int i = 100 / 2;
        int i2 = 100 - i;
        BufferPool createBufferPool = this.globalPool.createBufferPool(100, 100);
        SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
        Assert.assertEquals(i, createBufferPool.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.emitRecord(ByteBuffer.allocate(bufferSize * (i2 - 1)), 5);
        Assert.assertEquals(100, createBufferPool.bestEffortGetNumOfUsedBuffers());
        createSortMergedPartition.close();
        Assert.assertTrue(createBufferPool.isDestroyed());
        Assert.assertEquals(1000L, this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test(expected = IllegalStateException.class)
    public void testReadUnfinishedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            createSortMergedPartition(10, createBufferPool).createSubpartitionView(0, this.listener);
        } finally {
            createBufferPool.lazyDestroy();
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testReadReleasedPartition() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        try {
            SortMergeResultPartition createSortMergedPartition = createSortMergedPartition(10, createBufferPool);
            createSortMergedPartition.finish();
            createSortMergedPartition.release();
            createSortMergedPartition.createSubpartitionView(0, this.listener);
        } finally {
            createBufferPool.lazyDestroy();
        }
    }

    private SortMergeResultPartition createSortMergedPartition(int i, BufferPool bufferPool) throws IOException {
        return createSortMergedPartition(i, bufferPool, this.readBufferPool);
    }

    private SortMergeResultPartition createSortMergedPartition(int i, BufferPool bufferPool, BatchShuffleReadBufferPool batchShuffleReadBufferPool) throws IOException {
        SortMergeResultPartition sortMergeResultPartition = new SortMergeResultPartition("SortMergedResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.BLOCKING, i, i, batchShuffleReadBufferPool, this.readIOExecutor, new ResultPartitionManager(), this.fileChannelManager.createChannel().getPath(), (BufferCompressor) null, () -> {
            return bufferPool;
        });
        sortMergeResultPartition.setup();
        return sortMergeResultPartition;
    }
}
