package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.class */
public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManager {
    private static final int INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS = 50;
    private final float numTriggerReclaimBuffersRatio;
    private final boolean mayReclaimBuffer;

    @Nullable
    private ScheduledExecutorService executor;
    private BufferPool bufferPool;
    private final Map<Object, TieredStorageMemorySpec> tieredMemorySpecs = new HashMap();
    private final AtomicInteger numRequestedBuffers = new AtomicInteger(0);
    private final Map<Object, AtomicInteger> numOwnerRequestedBuffers = new ConcurrentHashMap();
    private final List<Runnable> bufferReclaimRequestListeners = new ArrayList();
    private boolean isInitialized = false;

    public TieredStorageMemoryManagerImpl(float f, boolean z) {
        this.numTriggerReclaimBuffersRatio = f;
        this.mayReclaimBuffer = z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> list) {
        this.bufferPool = bufferPool;
        for (TieredStorageMemorySpec tieredStorageMemorySpec : list) {
            Preconditions.checkState(!this.tieredMemorySpecs.containsKey(tieredStorageMemorySpec.getOwner()), "Duplicated memory spec.");
            this.tieredMemorySpecs.put(tieredStorageMemorySpec.getOwner(), tieredStorageMemorySpec);
        }
        if (this.mayReclaimBuffer) {
            this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("buffer reclaim checker").setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE).build());
        }
        this.isInitialized = true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public void listenBufferReclaimRequest(Runnable runnable) {
        this.bufferReclaimRequestListeners.add(runnable);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public BufferBuilder requestBufferBlocking(Object obj) {
        checkIsInitialized();
        reclaimBuffersIfNeeded();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        scheduleCheckRequestBufferFuture(completableFuture, 50L);
        MemorySegment memorySegment = null;
        try {
            memorySegment = this.bufferPool.requestMemorySegmentBlocking();
        } catch (InterruptedException e) {
            ExceptionUtils.rethrow(e);
        }
        completableFuture.complete(null);
        incNumRequestedBuffer(obj);
        return new BufferBuilder((MemorySegment) Preconditions.checkNotNull(memorySegment), memorySegment2 -> {
            recycleBuffer(obj, memorySegment2);
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public int getMaxNonReclaimableBuffers(Object obj) {
        checkIsInitialized();
        int i = 0;
        for (Map.Entry<Object, TieredStorageMemorySpec> entry : this.tieredMemorySpecs.entrySet()) {
            Object key = entry.getKey();
            TieredStorageMemorySpec value = entry.getValue();
            if (!key.equals(obj)) {
                i += Math.max(value.getNumGuaranteedBuffers(), numOwnerRequestedBuffer(key));
            }
        }
        return this.bufferPool.getNumBuffers() - i;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public int numOwnerRequestedBuffer(Object obj) {
        AtomicInteger atomicInteger = this.numOwnerRequestedBuffers.get(obj);
        if (atomicInteger == null) {
            return 0;
        }
        return atomicInteger.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager
    public void release() {
        Preconditions.checkState(this.numRequestedBuffers.get() == 0, "Leaking buffers.");
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (this.executor.awaitTermination(5L, TimeUnit.MINUTES)) {
                } else {
                    throw new TimeoutException("Timeout for shutting down the buffer reclaim checker executor.");
                }
            } catch (Exception e) {
                ExceptionUtils.rethrow(e);
            }
        }
    }

    private void scheduleCheckRequestBufferFuture(CompletableFuture<Void> completableFuture, long j) {
        if (!this.mayReclaimBuffer || completableFuture.isDone()) {
            return;
        }
        ((ScheduledExecutorService) Preconditions.checkNotNull(this.executor)).schedule(() -> {
            internalCheckRequestBufferFuture(completableFuture, j * 2);
        }, j, TimeUnit.MILLISECONDS);
    }

    private void internalCheckRequestBufferFuture(CompletableFuture<Void> completableFuture, long j) {
        if (completableFuture.isDone()) {
            return;
        }
        reclaimBuffersIfNeeded();
        scheduleCheckRequestBufferFuture(completableFuture, j);
    }

    private void incNumRequestedBuffer(Object obj) {
        this.numOwnerRequestedBuffers.computeIfAbsent(obj, obj2 -> {
            return new AtomicInteger(0);
        }).incrementAndGet();
        this.numRequestedBuffers.incrementAndGet();
    }

    private void decNumRequestedBuffer(Object obj) {
        ((AtomicInteger) Preconditions.checkNotNull(this.numOwnerRequestedBuffers.get(obj))).decrementAndGet();
        this.numRequestedBuffers.decrementAndGet();
    }

    private void reclaimBuffersIfNeeded() {
        if (shouldReclaimBuffersBeforeRequesting()) {
            this.bufferReclaimRequestListeners.forEach((v0) -> {
                v0.run();
            });
        }
    }

    private boolean shouldReclaimBuffersBeforeRequesting() {
        int numBuffers = this.bufferPool.getNumBuffers();
        int i = this.numRequestedBuffers.get();
        return i >= numBuffers || (((double) (i + 1)) * 1.0d) / ((double) numBuffers) > ((double) this.numTriggerReclaimBuffersRatio);
    }

    private void recycleBuffer(Object obj, MemorySegment memorySegment) {
        this.bufferPool.recycle(memorySegment);
        decNumRequestedBuffer(obj);
    }

    private void checkIsInitialized() {
        Preconditions.checkState(this.isInitialized, "The memory manager is not in the running state.");
    }
}
