package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/ResultPartitionTest.class */
class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final int bufferSize = HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE;

    ResultPartitionTest() {
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    void testResultSubpartitionInfo() {
        for (int i = 0; i < 2; i++) {
            ResultSubpartition[] allPartitions = new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build().getAllPartitions();
            for (int i2 = 0; i2 < allPartitions.length; i2++) {
                ResultSubpartitionInfo subpartitionInfo = allPartitions[i2].getSubpartitionInfo();
                Assertions.assertThat(subpartitionInfo.getPartitionIdx()).isEqualTo(i);
                Assertions.assertThat(subpartitionInfo.getSubPartitionIdx()).isEqualTo(i2);
            }
        }
    }

    @Test
    void testAddOnFinishedPipelinedPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnFinishedBlockingPartition() throws Exception {
        testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(resultPartitionManager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.finish();
        Assertions.assertThat(resultPartitionManager.getUnreleasedPartitions()).contains(new ResultPartitionID[]{build.getPartitionId()});
        for (int i = 0; i < 2; i++) {
            build.createSubpartitionView(0, () -> {
            }).releaseAllResources();
            Assertions.assertThat(resultPartitionManager.getUnreleasedPartitions()).contains(new ResultPartitionID[]{build.getPartitionId()});
            Assertions.assertThat(build.isReleased()).isFalse();
        }
    }

    private void testAddOnFinishedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        Assertions.assertThatThrownBy(() -> {
            createResultPartition.finish();
            createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        }).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(createResultPartition.numBuffersOut.getCount()).isZero();
        Assertions.assertThat(createResultPartition.numBytesOut.getCount()).isZero();
        Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
    }

    @Test
    void testAddOnReleasedPipelinedPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnReleasedBlockingPartition() throws Exception {
        testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    private void testAddOnReleasedPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        try {
            createResultPartition.release((Throwable) null);
            createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        } finally {
            Assertions.assertThat(createResultPartition.numBuffersOut.getCount()).isEqualTo(1L);
            Assertions.assertThat(createResultPartition.numBytesOut.getCount()).isEqualTo(1024L);
            Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
        }
    }

    @Test
    void testAddOnPipelinedPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnBlockingPartition() throws Exception {
        testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionManager(resultPartitionManager).build();
        resultPartitionManager.registerResultPartition(build);
        build.fail((Throwable) null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException(resultPartitionManager, build.getPartitionId());
    }

    private void testAddOnPartition(ResultPartitionType resultPartitionType) throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(resultPartitionType);
        try {
            createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        } finally {
            Assertions.assertThat(createResultPartition.numBuffersOut.getCount()).isEqualTo(1L);
            Assertions.assertThat(createResultPartition.numBytesOut.getCount()).isEqualTo(1024L);
            Assertions.assertThat(createResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
        }
    }

    @Test
    void testReleaseMemoryOnPipelinedPartition() throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).build();
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED, 1);
        try {
            createPartition.setup();
            for (int i = 0; i < 10; i++) {
                createPartition.emitRecord(ByteBuffer.allocate(1023), 0);
            }
            Assertions.assertThat(createPartition.getBufferPool().getNumberOfAvailableMemorySegments()).isZero();
            createPartition.close();
            Assertions.assertThat(createPartition.getBufferPool().isDestroyed()).isTrue();
            Assertions.assertThat(build.getNetworkBufferPool().getNumberOfUsedMemorySegments()).isEqualTo(10);
            createPartition.release();
            Assertions.assertThat(build.getNetworkBufferPool().getNumberOfUsedMemorySegments()).isZero();
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test
    void testIsAvailableOrNot() throws IOException {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).build();
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED, 1);
        try {
            createPartition.setup();
            createPartition.getBufferPool().setNumBuffers(2);
            Assertions.assertThat(createPartition.getAvailableFuture()).isDone();
            createPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
            createPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
            Assertions.assertThat(createPartition.getAvailableFuture()).isNotDone();
            createPartition.release();
            build.close();
        } catch (Throwable th) {
            createPartition.release();
            build.close();
            throw th;
        }
    }

    @Test
    void testPipelinedPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testBlockingPartitionBufferPool() throws Exception {
        testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    private void testPartitionBufferPool(ResultPartitionType resultPartitionType) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(20, 1);
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionType(resultPartitionType).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(networkBufferPool).build();
        try {
            build.setup();
            BufferPool bufferPool = build.getBufferPool();
            Assertions.assertThat(bufferPool.getNumberOfRequiredMemorySegments()).isEqualTo(build.getNumberOfSubpartitions() + 1);
            if (resultPartitionType.isBounded()) {
                Assertions.assertThat(bufferPool.getMaxNumberOfMemorySegments()).isEqualTo((2 * build.getNumberOfSubpartitions()) + 8);
            } else {
                Assertions.assertThat(bufferPool.getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
            }
        } finally {
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        }
    }

    private BufferWritingResultPartition createResultPartition(ResultPartitionType resultPartitionType) throws IOException {
        BufferWritingResultPartition createPartition = PartitionTestUtils.createPartition(new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).build(), fileChannelManager, resultPartitionType, 2);
        createPartition.setup();
        return createPartition;
    }

    @Test
    void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
        int i = HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE;
        BufferPool createBufferPool = new NetworkBufferPool(10, HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE).createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        BufferWritingResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        build.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        ResultSubpartitionView createSubpartitionView = build.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        Buffer buffer = createSubpartitionView.getNextBuffer().buffer();
        Assertions.assertThat(buffer).isNotNull();
        Assertions.assertThat(build.getHardBackPressuredTimeMsPerSecond().getValue()).isZero();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                countDownLatch.countDown();
                build.emitRecord(ByteBuffer.allocate(i), 0);
            } catch (Exception e) {
            }
        });
        thread.start();
        countDownLatch.await();
        while (!LocalBufferPoolDestroyTest.isInBlockingBufferRequest(thread.getStackTrace())) {
            Thread.sleep(50L);
        }
        Thread.sleep(5L);
        buffer.recycleBuffer();
        thread.join();
        Assertions.assertThat(build.getHardBackPressuredTimeMsPerSecond().getCount()).isGreaterThan(0L);
        Assertions.assertThat(createSubpartitionView.getNextBuffer().buffer()).isNotNull();
    }

    @Test
    void testFlushBoundedBlockingResultPartition() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.BLOCKING);
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        allocate.rewind();
        createResultPartition.emitRecord(allocate, 0);
        createResultPartition.flush(0);
        allocate.rewind();
        createResultPartition.emitRecord(allocate, 0);
        allocate.rewind();
        createResultPartition.broadcastRecord(allocate);
        createResultPartition.flushAll();
        allocate.rewind();
        createResultPartition.broadcastRecord(allocate);
        createResultPartition.finish();
        allocate.rewind();
        ResultSubpartitionView createSubpartitionView = createResultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        for (int i = 0; i < 4; i++) {
            Assertions.assertThat(createSubpartitionView.getNextBuffer().buffer().getNioBufferReadable()).isEqualTo(allocate);
        }
        Assertions.assertThat(createSubpartitionView.getNextBuffer().buffer().isBuffer()).isFalse();
        Assertions.assertThat(createSubpartitionView.getNextBuffer()).isNull();
        ResultSubpartitionView createSubpartitionView2 = createResultPartition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
        for (int i2 = 0; i2 < 2; i2++) {
            Assertions.assertThat(createSubpartitionView2.getNextBuffer().buffer().getNioBufferReadable()).isEqualTo(allocate);
        }
        Assertions.assertThat(createSubpartitionView2.getNextBuffer().buffer().isBuffer()).isFalse();
        Assertions.assertThat(createSubpartitionView2.getNextBuffer()).isNull();
    }

    @Test
    void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition pipelinedSubpartition = createResultPartition.subpartitions[0];
        try {
            createResultPartition.emitRecord(ByteBuffer.allocate(341), 0);
            createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
            Assertions.assertThat(pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
            Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isZero();
            Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isEqualTo(341);
        } catch (Throwable th) {
            Assertions.assertThat(pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
            Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isZero();
            Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isEqualTo(341);
            throw th;
        }
    }

    @Test
    void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        try {
            createResultPartition.broadcastRecord(ByteBuffer.allocate(341));
            createResultPartition.broadcastRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE));
            for (PipelinedSubpartition pipelinedSubpartition : createResultPartition.subpartitions) {
                Assertions.assertThat(pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
                Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isZero();
                Assertions.assertThat(pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isEqualTo(341);
            }
        } catch (Throwable th) {
            for (PipelinedSubpartition pipelinedSubpartition2 : createResultPartition.subpartitions) {
                Assertions.assertThat(pipelinedSubpartition2.getNumberOfQueuedBuffers()).isEqualTo(2);
                Assertions.assertThat(pipelinedSubpartition2.getNextBuffer().getPartialRecordLength()).isZero();
                Assertions.assertThat(pipelinedSubpartition2.getNextBuffer().getPartialRecordLength()).isEqualTo(341);
            }
            throw th;
        }
    }

    @Test
    public void testWaitForAllRecordProcessed() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        createResultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allDataProcessedFuture = createResultPartition.getAllDataProcessedFuture();
        Assertions.assertThat(allDataProcessedFuture).isNotDone();
        for (PipelinedSubpartition pipelinedSubpartition : createResultPartition.subpartitions) {
            Assertions.assertThat(pipelinedSubpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(1L);
            Buffer buffer = pipelinedSubpartition.pollBuffer().buffer();
            Assertions.assertThat(buffer.isBuffer()).isFalse();
            Assertions.assertThat(EventSerializer.fromBuffer(buffer, getClass().getClassLoader())).isEqualTo(new EndOfData(StopMode.DRAIN));
        }
        for (int i = 0; i < createResultPartition.subpartitions.length; i++) {
            createResultPartition.subpartitions[i].acknowledgeAllDataProcessed();
            if (i < createResultPartition.subpartitions.length - 1) {
                Assertions.assertThat(allDataProcessedFuture).isNotDone();
            } else {
                Assertions.assertThat(allDataProcessedFuture).isDone();
                Assertions.assertThat(allDataProcessedFuture).isNotCompletedExceptionally();
            }
        }
    }

    @Test
    void testDifferentBufferSizeForSubpartitions() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assertions.assertThat(pipelinedSubpartitionArr.length).isEqualTo(2);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(6);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(2);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(6);
        pipelinedSubpartition.bufferSize(13);
        pipelinedSubpartition2.bufferSize(5);
        createResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(8), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        createResultPartition.emitRecord(ByteBuffer.allocate(7), 1);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(8);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(12);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(5);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(4);
    }

    @Test
    void testBufferSizeGreaterOrEqualToFirstRecord() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assertions.assertThat(pipelinedSubpartitionArr).hasSize(2);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(7);
        createResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(111), 1);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(12);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(111);
    }

    @Test
    void testDynamicBufferSizeForBroadcast() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assertions.assertThat(pipelinedSubpartitionArr).hasSize(2);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(6);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(6));
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(6);
        pipelinedSubpartition.bufferSize(4);
        pipelinedSubpartition2.bufferSize(12);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        createResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(6);
        pipelinedSubpartition.bufferSize(8);
        pipelinedSubpartition2.bufferSize(5);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(3);
    }

    @Test
    void testBufferSizeGreaterOrEqualToFirstBroadcastRecord() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(6);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(31));
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(31);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(31);
    }

    @Test
    void testBufferSizeNotChanged() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assertions.assertThat(pipelinedSubpartitionArr).hasSize(2);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(1025);
        pipelinedSubpartition2.bufferSize(Integer.MAX_VALUE);
        createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
        createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 1);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE);
    }

    @Test
    void testResultPartitionBytesCounterForUnicast() throws IOException {
        testResultPartitionBytesCounter(false);
    }

    @Test
    void testResultPartitionBytesCounterForBroadcast() throws IOException {
        testResultPartitionBytesCounter(true);
    }

    private void testResultPartitionBytesCounter(boolean z) throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.BLOCKING);
        if (z) {
            createResultPartition.broadcastRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE));
            Assertions.assertThat(createResultPartition.resultPartitionBytes.createSnapshot().getSubpartitionBytes()).containsExactly(new long[]{1024, 1024});
            Assertions.assertThat(createResultPartition.numBytesOut.getCount()).isEqualTo(2048L);
        } else {
            createResultPartition.emitRecord(ByteBuffer.allocate(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), 0);
            createResultPartition.emitRecord(ByteBuffer.allocate(2048), 1);
            Assertions.assertThat(createResultPartition.resultPartitionBytes.createSnapshot().getSubpartitionBytes()).containsExactly(new long[]{1024, 2048});
            Assertions.assertThat(createResultPartition.numBytesOut.getCount()).isEqualTo(3072L);
        }
    }

    @Test
    void testSizeOfQueuedBuffers() throws IOException {
        BufferWritingResultPartition createResultPartition = createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition[] pipelinedSubpartitionArr = createResultPartition.subpartitions;
        Assertions.assertThat(pipelinedSubpartitionArr).hasSize(2);
        PipelinedSubpartition pipelinedSubpartition = pipelinedSubpartitionArr[0];
        PipelinedSubpartition pipelinedSubpartition2 = pipelinedSubpartitionArr[1];
        pipelinedSubpartition.bufferSize(10);
        pipelinedSubpartition2.bufferSize(10);
        createResultPartition.emitRecord(ByteBuffer.allocate(3), 0);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(3L);
        createResultPartition.emitRecord(ByteBuffer.allocate(3), 1);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(6L);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(16L);
        createResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(26L);
        createResultPartition.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(34L);
        createResultPartition.emitRecord(ByteBuffer.allocate(5), 0);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(39L);
        createResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(53L);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(43L);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(33L);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(30L);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(27L);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(23L);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(19L);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(5);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(14L);
        Assertions.assertThat(pipelinedSubpartition.pollBuffer().buffer().getSize()).isEqualTo(7);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(7L);
        Assertions.assertThat(pipelinedSubpartition2.pollBuffer().buffer().getSize()).isEqualTo(7);
        Assertions.assertThat(createResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(0L);
    }
}
