package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImplTest.class */
public class TieredStorageMemoryManagerImplTest {
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
    private NetworkBufferPool globalPool;
    private List<BufferBuilder> requestedBuffers;
    private CompletableFuture<Void> hasReclaimBufferFinished;
    private int reclaimBufferCounter;

    @BeforeEach
    void before() {
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.requestedBuffers = new ArrayList();
        this.hasReclaimBufferFinished = new CompletableFuture<>();
        this.reclaimBufferCounter = 0;
    }

    @AfterEach
    void after() {
        this.globalPool.destroy();
    }

    @Test
    void testRequestAndRecycleBuffers() throws IOException {
        BufferPool createBufferPool = this.globalPool.createBufferPool(1, 1);
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(createBufferPool, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
        BufferBuilder requestBufferBlocking = createStorageMemoryManager.requestBufferBlocking(this);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(1);
        recycleBufferBuilder(requestBufferBlocking);
        Assertions.assertThat(createBufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(0);
        createStorageMemoryManager.release();
    }

    @Test
    void testGetMaxNonReclaimableBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10, Collections.singletonList(new TieredStorageMemorySpec(this, 5)));
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= 10; i++) {
            arrayList.add(createStorageMemoryManager.requestBufferBlocking(this));
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this)).isEqualTo(10);
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this) - createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo(10 - i);
        }
        arrayList.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        createStorageMemoryManager.release();
    }

    @Test
    void testNumMaxNonReclaimableWhenOtherUseLessThanGuaranteed() throws IOException {
        ArrayList arrayList = new ArrayList();
        Object obj = new Object();
        arrayList.add(new TieredStorageMemorySpec(this, 0));
        arrayList.add(new TieredStorageMemorySpec(obj, 4));
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10, arrayList);
        ArrayList arrayList2 = new ArrayList();
        Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this)).isEqualTo(10 - 4);
        for (int i = 1; i <= 10; i++) {
            arrayList2.add(createStorageMemoryManager.requestBufferBlocking(this));
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this)).isEqualTo(10 - 4);
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this) - createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo((10 - i) - 4);
        }
        arrayList2.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        createStorageMemoryManager.release();
    }

    @Test
    void testNumMaxNonReclaimableWhenOtherUseMoreThanGuaranteed() throws IOException {
        ArrayList arrayList = new ArrayList();
        Object obj = new Object();
        arrayList.add(new TieredStorageMemorySpec(this, 0));
        arrayList.add(new TieredStorageMemorySpec(obj, 4));
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10, arrayList);
        int i = 4 + 1;
        for (int i2 = 0; i2 < i; i2++) {
            this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(obj));
        }
        Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this)).isEqualTo(10 - i);
        for (int i3 = 1; i3 <= 10 - i; i3++) {
            this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(this));
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this)).isEqualTo(10 - i);
            Assertions.assertThat(createStorageMemoryManager.getMaxNonReclaimableBuffers(this) - createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo((10 - i3) - i);
        }
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo(10 - i);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(obj)).isEqualTo(i);
        this.requestedBuffers.forEach(TieredStorageMemoryManagerImplTest::recycleBufferBuilder);
        createStorageMemoryManager.release();
    }

    @Timeout(60)
    @Test
    void testTriggerReclaimBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(5, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        createStorageMemoryManager.listenBufferReclaimRequest(this::onBufferReclaimRequest);
        int i = (int) (5 * 0.6f);
        for (int i2 = 0; i2 < i; i2++) {
            this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(this));
        }
        Assertions.assertThat(this.reclaimBufferCounter).isEqualTo(0);
        Assertions.assertThat(this.requestedBuffers.size()).isEqualTo(i);
        this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(this));
        FlinkAssertions.assertThatFuture(this.hasReclaimBufferFinished).eventuallySucceeds();
        Assertions.assertThat(this.reclaimBufferCounter).isEqualTo(1);
        Assertions.assertThat(this.requestedBuffers.size()).isEqualTo(1);
        recycleRequestedBuffers();
        createStorageMemoryManager.release();
    }

    @Test
    void testTransferBufferOwnership() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(1, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        BufferBuilder requestBufferBlocking = createStorageMemoryManager.requestBufferBlocking(this);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo(1);
        BufferConsumer createBufferConsumerFromBeginning = requestBufferBlocking.createBufferConsumerFromBeginning();
        Buffer build = createBufferConsumerFromBeginning.build();
        requestBufferBlocking.close();
        createBufferConsumerFromBeginning.close();
        Object obj = new Object();
        createStorageMemoryManager.transferBufferOwnership(this, obj, build);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(this)).isEqualTo(0);
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(obj)).isEqualTo(1);
        build.recycleBuffer();
        Assertions.assertThat(createStorageMemoryManager.numOwnerRequestedBuffer(obj)).isEqualTo(0);
    }

    @Test
    void testCanNotTransferOwnershipForEvent() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(1, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        BufferConsumer createEventBufferConsumer = BufferBuilderTestUtils.createEventBufferConsumer(1, Buffer.DataType.EVENT_BUFFER);
        Buffer build = createEventBufferConsumer.build();
        createEventBufferConsumer.close();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            createStorageMemoryManager.transferBufferOwnership(this, new Object(), build);
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testReleaseBeforeRecyclingBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(5, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(this));
        createStorageMemoryManager.getClass();
        AssertionsForClassTypes.assertThatThrownBy(createStorageMemoryManager::release).isInstanceOf(IllegalStateException.class);
        recycleRequestedBuffers();
        createStorageMemoryManager.release();
    }

    @Test
    void testLeakingBuffers() throws IOException {
        TieredStorageMemoryManagerImpl createStorageMemoryManager = createStorageMemoryManager(10, Collections.singletonList(new TieredStorageMemorySpec(this, 0)));
        this.requestedBuffers.add(createStorageMemoryManager.requestBufferBlocking(this));
        createStorageMemoryManager.getClass();
        AssertionsForClassTypes.assertThatThrownBy(createStorageMemoryManager::release).isInstanceOf(IllegalStateException.class).hasMessageContaining("Leaking buffers");
        recycleRequestedBuffers();
        createStorageMemoryManager.release();
    }

    public void onBufferReclaimRequest() {
        this.reclaimBufferCounter++;
        recycleRequestedBuffers();
        this.hasReclaimBufferFinished.complete(null);
    }

    private void recycleRequestedBuffers() {
        this.requestedBuffers.forEach(bufferBuilder -> {
            Buffer build = bufferBuilder.createBufferConsumer().build();
            build.getRecycler().recycle(build.getMemorySegment());
        });
        this.requestedBuffers.clear();
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(int i, List<TieredStorageMemorySpec> list) throws IOException {
        return createStorageMemoryManager(this.globalPool.createBufferPool(i, i), list);
    }

    private TieredStorageMemoryManagerImpl createStorageMemoryManager(BufferPool bufferPool, List<TieredStorageMemorySpec> list) {
        TieredStorageMemoryManagerImpl tieredStorageMemoryManagerImpl = new TieredStorageMemoryManagerImpl(0.6f, true);
        tieredStorageMemoryManagerImpl.setup(bufferPool, list);
        return tieredStorageMemoryManagerImpl;
    }

    private static void recycleBufferBuilder(BufferBuilder bufferBuilder) {
        Buffer build = bufferBuilder.createBufferConsumer().build();
        new NetworkBuffer(build.getMemorySegment(), build.getRecycler(), build.getDataType()).recycleBuffer();
    }
}
