/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class ResultPartitionTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;
    private final int bufferSize = 1024;

    ResultPartitionTest() {
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    void testResultSubpartitionInfo() {
        int numPartitions = 2;
        int numSubpartitions = 3;
        for (int i = 0; i < 2; ++i) {
            PipelinedResultPartition partition = (PipelinedResultPartition)new ResultPartitionBuilder().setResultPartitionIndex(i).setNumberOfSubpartitions(3).build();
            ResultSubpartition[] subpartitions = partition.getAllPartitions();
            for (int j = 0; j < subpartitions.length; ++j) {
                ResultSubpartitionInfo subpartitionInfo = subpartitions[j].getSubpartitionInfo();
                Assertions.assertThat((int)subpartitionInfo.getPartitionIdx()).isEqualTo(i);
                Assertions.assertThat((int)subpartitionInfo.getSubPartitionIdx()).isEqualTo(j);
            }
        }
    }

    @Test
    void testAddOnFinishedPipelinedPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnFinishedBlockingPartition() throws Exception {
        this.testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsumption() throws IOException {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).setResultPartitionType(ResultPartitionType.BLOCKING).setFileChannelManager(fileChannelManager).build();
        manager.registerResultPartition(partition);
        partition.finish();
        Assertions.assertThat((Collection)manager.getUnreleasedPartitions()).contains((Object[])new ResultPartitionID[]{partition.getPartitionId()});
        for (int x = 0; x < 2; ++x) {
            ResultSubpartitionView subpartitionView1 = partition.createSubpartitionView(0, () -> {});
            subpartitionView1.releaseAllResources();
            Assertions.assertThat((Collection)manager.getUnreleasedPartitions()).contains((Object[])new ResultPartitionID[]{partition.getPartitionId()});
            Assertions.assertThat((boolean)partition.isReleased()).isFalse();
        }
    }

    private void testAddOnFinishedPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        Assertions.assertThatThrownBy(() -> {
            bufferWritingResultPartition.finish();
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((long)bufferWritingResultPartition.numBuffersOut.getCount()).isZero();
        Assertions.assertThat((long)bufferWritingResultPartition.numBytesOut.getCount()).isZero();
        Assertions.assertThat((int)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
    }

    @Test
    void testAddOnReleasedPipelinedPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnReleasedBlockingPartition() throws Exception {
        this.testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
    }

    private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        try {
            bufferWritingResultPartition.release(null);
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assertions.assertThat((long)bufferWritingResultPartition.numBuffersOut.getCount()).isEqualTo(1L);
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesOut.getCount()).isEqualTo(1024L);
            Assertions.assertThat((int)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isZero();
        }
    }

    @Test
    void testAddOnPipelinedPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.PIPELINED);
    }

    @Test
    void testAddOnBlockingPartition() throws Exception {
        this.testAddOnPartition(ResultPartitionType.BLOCKING);
    }

    @Test
    void testCreateSubpartitionOnFailingPartition() throws Exception {
        ResultPartitionManager manager = new ResultPartitionManager();
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionManager(manager).build();
        manager.registerResultPartition(partition);
        partition.fail(null);
        PartitionTestUtils.verifyCreateSubpartitionViewThrowsException((ResultPartitionProvider)manager, partition.getPartitionId());
    }

    private void testAddOnPartition(ResultPartitionType partitionType) throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(partitionType);
        try {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assertions.assertThat((long)bufferWritingResultPartition.numBuffersOut.getCount()).isEqualTo(1L);
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesOut.getCount()).isEqualTo(1024L);
            Assertions.assertThat((int)bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReleaseMemoryOnPipelinedPartition() throws Exception {
        int numAllBuffers = 10;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            for (int i = 0; i < 10; ++i) {
                resultPartition.emitRecord(ByteBuffer.allocate(1023), 0);
            }
            Assertions.assertThat((int)resultPartition.getBufferPool().getNumberOfAvailableMemorySegments()).isZero();
            resultPartition.close();
            Assertions.assertThat((boolean)resultPartition.getBufferPool().isDestroyed()).isTrue();
            Assertions.assertThat((int)network.getNetworkBufferPool().getNumberOfUsedMemorySegments()).isEqualTo(10);
            resultPartition.release();
            Assertions.assertThat((int)network.getNetworkBufferPool().getNumberOfUsedMemorySegments()).isZero();
        }
        finally {
            network.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testIsAvailableOrNot() throws IOException {
        int numAllBuffers = 10;
        int bufferSize = 1024;
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED, 1);
        try {
            resultPartition.setup();
            resultPartition.getBufferPool().setNumBuffers(2);
            Assertions.assertThat((CompletableFuture)resultPartition.getAvailableFuture()).isDone();
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            resultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assertions.assertThat((CompletableFuture)resultPartition.getAvailableFuture()).isNotDone();
        }
        finally {
            resultPartition.release();
            network.close();
        }
    }

    @Test
    void testPipelinedPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testBlockingPartitionBufferPool() throws Exception {
        this.testPartitionBufferPool(ResultPartitionType.BLOCKING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testPartitionBufferPool(ResultPartitionType type) throws Exception {
        int networkBuffersPerChannel = 2;
        int floatingNetworkBuffersPerGate = 8;
        NetworkBufferPool globalPool = new NetworkBufferPool(20, 1);
        ResultPartition partition = new ResultPartitionBuilder().setResultPartitionType(type).setFileChannelManager(fileChannelManager).setNetworkBuffersPerChannel(2).setFloatingNetworkBuffersPerGate(8).setNetworkBufferPool(globalPool).build();
        try {
            partition.setup();
            BufferPool bufferPool = partition.getBufferPool();
            Assertions.assertThat((int)bufferPool.getNumberOfRequiredMemorySegments()).isEqualTo(partition.getNumberOfSubpartitions() + 1);
            if (type.isBounded()) {
                int maxNumBuffers = 2 * partition.getNumberOfSubpartitions() + 8;
                Assertions.assertThat((int)bufferPool.getMaxNumberOfMemorySegments()).isEqualTo(maxNumBuffers);
            } else {
                Assertions.assertThat((int)bufferPool.getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
            }
        }
        finally {
            globalPool.destroyAllBufferPools();
            globalPool.destroy();
        }
    }

    private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
        NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(10).setBufferSize(1024).build();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(network, fileChannelManager, partitionType, 2);
        resultPartition.setup();
        return (BufferWritingResultPartition)resultPartition;
    }

    @Test
    void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
        int bufferSize = 1024;
        NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE, 0);
        BufferWritingResultPartition resultPartition = (BufferWritingResultPartition)new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        resultPartition.setup();
        resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
        ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        Assertions.assertThat((Object)buffer).isNotNull();
        Assertions.assertThat((Long)resultPartition.getHardBackPressuredTimeMsPerSecond().getValue()).isZero();
        CountDownLatch syncLock = new CountDownLatch(1);
        Thread requestThread = new Thread(() -> {
            try {
                syncLock.countDown();
                resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        requestThread.start();
        syncLock.await();
        while (!LocalBufferPoolDestroyTest.isInBlockingBufferRequest(requestThread.getStackTrace())) {
            Thread.sleep(50L);
        }
        buffer.recycleBuffer();
        requestThread.join();
        Assertions.assertThat((long)resultPartition.getHardBackPressuredTimeMsPerSecond().getCount()).isGreaterThan(0L);
        Assertions.assertThat((Object)readView.getNextBuffer().buffer()).isNotNull();
    }

    @Test
    void testFlushBoundedBlockingResultPartition() throws IOException {
        int value = 1024;
        BufferWritingResultPartition partition = this.createResultPartition(ResultPartitionType.BLOCKING);
        ByteBuffer record = ByteBuffer.allocate(4);
        record.putInt(value);
        record.rewind();
        partition.emitRecord(record, 0);
        partition.flush(0);
        record.rewind();
        partition.emitRecord(record, 0);
        record.rewind();
        partition.broadcastRecord(record);
        partition.flushAll();
        record.rewind();
        partition.broadcastRecord(record);
        partition.finish();
        record.rewind();
        ResultSubpartitionView readView1 = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 4; ++i) {
            Assertions.assertThat((Comparable)readView1.getNextBuffer().buffer().getNioBufferReadable()).isEqualTo((Object)record);
        }
        Assertions.assertThat((boolean)readView1.getNextBuffer().buffer().isBuffer()).isFalse();
        Assertions.assertThat((Object)readView1.getNextBuffer()).isNull();
        ResultSubpartitionView readView2 = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        for (int i = 0; i < 2; ++i) {
            Assertions.assertThat((Comparable)readView2.getNextBuffer().buffer().getNioBufferReadable()).isEqualTo((Object)record);
        }
        Assertions.assertThat((boolean)readView2.getNextBuffer().buffer().isBuffer()).isFalse();
        Assertions.assertThat((Object)readView2.getNextBuffer()).isNull();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testEmitRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)bufferWritingResultPartition.subpartitions[0];
        int partialLength = 341;
        try {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(partialLength), 0);
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        finally {
            Assertions.assertThat((int)pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
            Assertions.assertThat((int)pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isZero();
            Assertions.assertThat((int)pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isEqualTo(partialLength);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws Exception {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        int partialLength = 341;
        try {
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(partialLength));
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
        }
        finally {
            for (ResultSubpartition resultSubpartition : bufferWritingResultPartition.subpartitions) {
                PipelinedSubpartition pipelinedSubpartition = (PipelinedSubpartition)resultSubpartition;
                Assertions.assertThat((int)pipelinedSubpartition.getNumberOfQueuedBuffers()).isEqualTo(2);
                Assertions.assertThat((int)pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isZero();
                Assertions.assertThat((int)pipelinedSubpartition.getNextBuffer().getPartialRecordLength()).isEqualTo(partialLength);
            }
        }
    }

    @Test
    public void testWaitForAllRecordProcessed() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        bufferWritingResultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allRecordsProcessedFuture = bufferWritingResultPartition.getAllDataProcessedFuture();
        Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isNotDone();
        for (ResultSubpartition resultSubpartition : bufferWritingResultPartition.subpartitions) {
            Assertions.assertThat((long)resultSubpartition.getTotalNumberOfBuffersUnsafe()).isEqualTo(1L);
            Buffer nextBuffer = ((PipelinedSubpartition)resultSubpartition).pollBuffer().buffer();
            Assertions.assertThat((boolean)nextBuffer.isBuffer()).isFalse();
            Assertions.assertThat((Object)EventSerializer.fromBuffer((Buffer)nextBuffer, (ClassLoader)this.getClass().getClassLoader())).isEqualTo((Object)new EndOfData(StopMode.DRAIN));
        }
        for (int i = 0; i < bufferWritingResultPartition.subpartitions.length; ++i) {
            ((PipelinedSubpartition)bufferWritingResultPartition.subpartitions[i]).acknowledgeAllDataProcessed();
            if (i < bufferWritingResultPartition.subpartitions.length - 1) {
                Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isNotDone();
                continue;
            }
            Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isDone();
            Assertions.assertThat((CompletableFuture)allRecordsProcessedFuture).isNotCompletedExceptionally();
        }
    }

    @Test
    void testDifferentBufferSizeForSubpartitions() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assertions.assertThat((int)subpartitions.length).isEqualTo(2);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(6);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(2);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(6);
        subpartition0.bufferSize(13);
        subpartition1.bufferSize(5);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(8), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(2), 1);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(7), 1);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(8);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(12);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(5);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(4);
    }

    @Test
    void testBufferSizeGreaterOrEqualToFirstRecord() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        Object[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assertions.assertThat((Object[])subpartitions).hasSize(2);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(7);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(12), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(111), 1);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(12);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(111);
    }

    @Test
    void testDynamicBufferSizeForBroadcast() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        Object[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assertions.assertThat((Object[])subpartitions).hasSize(2);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(6);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(6));
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(6);
        subpartition0.bufferSize(4);
        subpartition1.bufferSize(12);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(6);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(6);
        subpartition0.bufferSize(8);
        subpartition1.bufferSize(5);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(3));
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(3);
    }

    @Test
    void testBufferSizeGreaterOrEqualToFirstBroadcastRecord() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultSubpartition[] subpartitions = bufferWritingResultPartition.subpartitions;
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(6);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(31));
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(31);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(31);
    }

    @Test
    void testBufferSizeNotChanged() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED_BOUNDED);
        Object[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assertions.assertThat((Object[])subpartitions).hasSize(2);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(1025);
        subpartition1.bufferSize(Integer.MAX_VALUE);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 1);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(1024);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(1024);
    }

    @Test
    void testNumBytesProducedCounterForUnicast() throws IOException {
        this.testNumBytesProducedCounter(false);
    }

    @Test
    void testNumBytesProducedCounterForBroadcast() throws IOException {
        this.testNumBytesProducedCounter(true);
    }

    private void testNumBytesProducedCounter(boolean isBroadcast) throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.BLOCKING);
        if (isBroadcast) {
            bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesProduced.getCount()).isEqualTo(1024L);
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesOut.getCount()).isEqualTo(2048L);
        } else {
            bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesProduced.getCount()).isEqualTo(1024L);
            Assertions.assertThat((long)bufferWritingResultPartition.numBytesOut.getCount()).isEqualTo(1024L);
        }
    }

    @Test
    void testSizeOfQueuedBuffers() throws IOException {
        BufferWritingResultPartition bufferWritingResultPartition = this.createResultPartition(ResultPartitionType.PIPELINED);
        Object[] subpartitions = bufferWritingResultPartition.subpartitions;
        Assertions.assertThat((Object[])subpartitions).hasSize(2);
        PipelinedSubpartition subpartition0 = (PipelinedSubpartition)subpartitions[0];
        PipelinedSubpartition subpartition1 = (PipelinedSubpartition)subpartitions[1];
        subpartition0.bufferSize(10);
        subpartition1.bufferSize(10);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(3), 0);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(3L);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(3), 1);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(6L);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 0);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(16L);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(10), 1);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(26L);
        bufferWritingResultPartition.broadcastEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, false);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(34L);
        bufferWritingResultPartition.emitRecord(ByteBuffer.allocate(5), 0);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(39L);
        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(7));
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(53L);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(43L);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(10);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(33L);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(30L);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(3);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(27L);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(23L);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(4);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(19L);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(5);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(14L);
        Assertions.assertThat((int)subpartition0.pollBuffer().buffer().getSize()).isEqualTo(7);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(7L);
        Assertions.assertThat((int)subpartition1.pollBuffer().buffer().getSize()).isEqualTo(7);
        Assertions.assertThat((long)bufferWritingResultPartition.getSizeOfQueuedBuffersUnsafe()).isEqualTo(0L);
    }
}

