package org.apache.flink.runtime.io.network.partition.hybrid;

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingFileDataIndex;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerTest.class */
class HsMemoryDataManagerTest {
    private static final int NUM_BUFFERS = 10;
    private static final int NUM_SUBPARTITIONS = 3;
    private int poolSize = 10;
    private int bufferSize = 4;
    private Path dataFilePath;
    private Path indexFilePath;

    HsMemoryDataManagerTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) {
        this.dataFilePath = path.resolve(".data");
        this.indexFilePath = path.resolve(".index");
    }

    @Test
    void testAppendMarkBufferFinished() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingSpillingStrategy build = TestingSpillingStrategy.builder().setOnBufferFinishedFunction((num, num2) -> {
            atomicInteger.incrementAndGet();
            return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
        }).build();
        this.bufferSize = 12;
        HsMemoryDataManager createMemoryDataManager = createMemoryDataManager(build);
        createMemoryDataManager.append(createRecord(0), 0, Buffer.DataType.DATA_BUFFER);
        createMemoryDataManager.append(createRecord(1), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(atomicInteger).hasValue(0);
        createMemoryDataManager.append(createRecord(2), 0, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(atomicInteger).hasValue(1);
        createMemoryDataManager.append(createRecord(3), 0, Buffer.DataType.DATA_BUFFER);
        createMemoryDataManager.append(createRecord(4), 0, Buffer.DataType.EVENT_BUFFER);
        Assertions.assertThat(atomicInteger).hasValue(3);
    }

    @Test
    void testAppendRequestBuffer() throws Exception {
        this.poolSize = 3;
        ArrayList arrayList = new ArrayList();
        HsMemoryDataManager createMemoryDataManager = createMemoryDataManager(TestingSpillingStrategy.builder().setOnMemoryUsageChangedFunction((num, num2) -> {
            arrayList.add(Tuple2.of(num, num2));
            return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
        }).build());
        createMemoryDataManager.append(createRecord(0), 0, Buffer.DataType.DATA_BUFFER);
        createMemoryDataManager.append(createRecord(1), 1, Buffer.DataType.DATA_BUFFER);
        createMemoryDataManager.append(createRecord(2), 2, Buffer.DataType.DATA_BUFFER);
        Assertions.assertThat(createMemoryDataManager.getNumTotalRequestedBuffers()).isEqualTo(3);
        Assertions.assertThat(arrayList).isEqualTo(Arrays.asList(Tuple2.of(1, 3), Tuple2.of(2, 3), Tuple2.of(3, 3)));
    }

    @Test
    void testHandleDecision() throws Exception {
        List<BufferIndexAndChannel> createBufferIndexAndChannelsList = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2);
        List<BufferIndexAndChannel> createBufferIndexAndChannelsList2 = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 2, 3);
        TestingSpillingStrategy build = TestingSpillingStrategy.builder().setOnBufferFinishedFunction((num, num2) -> {
            return num.intValue() < 4 ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.of(HsSpillingStrategy.Decision.builder().addBufferToSpill(0, createBufferIndexAndChannelsList).addBufferToRelease(0, createBufferIndexAndChannelsList2).build());
        }).build();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingFileDataIndex.Builder builder = TestingFileDataIndex.builder();
        completableFuture.getClass();
        HsMemoryDataManager createMemoryDataManager = createMemoryDataManager(build, builder.setAddBuffersConsumer((v1) -> {
            r1.complete(v1);
        }).setMarkBufferReadableConsumer((num3, num4) -> {
            completableFuture2.complete(num4);
        }).build());
        for (int i = 0; i < 4; i++) {
            createMemoryDataManager.append(createRecord(i), 0, Buffer.DataType.DATA_BUFFER);
        }
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(completableFuture2).eventuallySucceeds();
        Assertions.assertThat(completableFuture2).isCompletedWithValue(2);
        Assertions.assertThat(createMemoryDataManager.getNumTotalUnSpillBuffers()).isEqualTo(1);
    }

    @Test
    void testHandleEmptyDecision() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createMemoryDataManager(TestingSpillingStrategy.builder().setOnBufferFinishedFunction((num, num2) -> {
            return Optional.empty();
        }).setDecideActionWithGlobalInfoFunction(hsSpillingInfoProvider -> {
            completableFuture.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build()).onBufferFinished();
        Assertions.assertThat(completableFuture).isCompleted();
    }

    @Test
    void testResultPartitionClosed() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createMemoryDataManager(TestingSpillingStrategy.builder().setOnResultPartitionClosedFunction(hsSpillingInfoProvider -> {
            completableFuture.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build()).close();
        Assertions.assertThat(completableFuture).isCompleted();
    }

    @Test
    void testSubpartitionConsumerRelease() throws Exception {
        HsMemoryDataManager createMemoryDataManager = createMemoryDataManager(TestingSpillingStrategy.builder().build());
        createMemoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, new TestingSubpartitionConsumerInternalOperation());
        Assertions.assertThatThrownBy(() -> {
            createMemoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, new TestingSubpartitionConsumerInternalOperation());
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Each subpartition view should have unique consumerId.");
        createMemoryDataManager.onConsumerReleased(0, HsConsumerId.DEFAULT);
        createMemoryDataManager.registerNewConsumer(0, HsConsumerId.DEFAULT, new TestingSubpartitionConsumerInternalOperation());
    }

    @Test
    void testPoolSizeCheck() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(100, this.bufferSize);
        BufferPool createBufferPool = networkBufferPool.createBufferPool(10, 100);
        Assertions.assertThat(createBufferPool.getNumBuffers()).isEqualTo(100);
        createMemoryDataManager(TestingSpillingStrategy.builder().setDecideActionWithGlobalInfoFunction(hsSpillingInfoProvider -> {
            Assertions.assertThat(hsSpillingInfoProvider.getPoolSize()).isEqualTo(10);
            completableFuture.complete(null);
            return HsSpillingStrategy.Decision.NO_ACTION;
        }).build(), createBufferPool);
        networkBufferPool.createBufferPool(90, 100);
        Assertions.assertThat(createBufferPool.getNumBuffers()).isEqualTo(10);
        FlinkAssertions.assertThatFuture(completableFuture).eventuallySucceeds();
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy hsSpillingStrategy) throws Exception {
        return createMemoryDataManager(hsSpillingStrategy, (HsFileDataIndex) new HsFileDataIndexImpl(3, this.indexFilePath, 256, Long.MAX_VALUE));
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy hsSpillingStrategy, HsFileDataIndex hsFileDataIndex) throws Exception {
        return createMemoryDataManager(new NetworkBufferPool(10, this.bufferSize).createBufferPool(this.poolSize, this.poolSize), hsSpillingStrategy, hsFileDataIndex);
    }

    private HsMemoryDataManager createMemoryDataManager(HsSpillingStrategy hsSpillingStrategy, BufferPool bufferPool) throws Exception {
        return createMemoryDataManager(bufferPool, hsSpillingStrategy, new HsFileDataIndexImpl(3, this.indexFilePath, 256, Long.MAX_VALUE));
    }

    private HsMemoryDataManager createMemoryDataManager(BufferPool bufferPool, HsSpillingStrategy hsSpillingStrategy, HsFileDataIndex hsFileDataIndex) throws Exception {
        HsMemoryDataManager hsMemoryDataManager = new HsMemoryDataManager(3, this.bufferSize, bufferPool, hsSpillingStrategy, hsFileDataIndex, this.dataFilePath, (BufferCompressor) null, 1000L);
        hsMemoryDataManager.setOutputMetrics(HybridShuffleTestUtils.createTestingOutputMetrics());
        return hsMemoryDataManager;
    }

    private static ByteBuffer createRecord(int i) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(i);
        allocate.flip();
        return allocate;
    }
}
