package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.class */
public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryDataManagerOperation {
    private static final Logger LOG = LoggerFactory.getLogger(HsMemoryDataManager.class);
    private final int numSubpartitions;
    private final HsSubpartitionMemoryDataManager[] subpartitionMemoryDataManagers;
    private final HsMemoryDataSpiller spiller;
    private final HsSpillingStrategy spillStrategy;
    private final HsFileDataIndex fileDataIndex;
    private final BufferPool bufferPool;
    private final Lock lock;
    private final List<Map<HsConsumerId, HsSubpartitionConsumerInternalOperations>> subpartitionViewOperationsMap;
    private final AtomicInteger poolSize;
    private final AtomicInteger numRequestedBuffers = new AtomicInteger(0);
    private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
    private final ScheduledExecutorService poolSizeChecker = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("hybrid-shuffle-pool-size-checker-executor"));

    public HsMemoryDataManager(int i, int i2, BufferPool bufferPool, HsSpillingStrategy hsSpillingStrategy, HsFileDataIndex hsFileDataIndex, Path path, BufferCompressor bufferCompressor, long j) throws IOException {
        this.numSubpartitions = i;
        this.bufferPool = bufferPool;
        this.spiller = new HsMemoryDataSpiller(path);
        this.spillStrategy = hsSpillingStrategy;
        this.fileDataIndex = hsFileDataIndex;
        this.subpartitionMemoryDataManagers = new HsSubpartitionMemoryDataManager[i];
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(true);
        this.lock = reentrantReadWriteLock.writeLock();
        this.subpartitionViewOperationsMap = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            this.subpartitionMemoryDataManagers[i3] = new HsSubpartitionMemoryDataManager(i3, i2, reentrantReadWriteLock.readLock(), bufferCompressor, this);
            this.subpartitionViewOperationsMap.add(new ConcurrentHashMap());
        }
        this.poolSize = new AtomicInteger(this.bufferPool.getNumBuffers());
        if (j > 0) {
            this.poolSizeChecker.scheduleAtFixedRate(() -> {
                int numBuffers = this.bufferPool.getNumBuffers();
                if (this.poolSize.getAndSet(numBuffers) > numBuffers) {
                    handleDecision(Optional.empty());
                }
            }, j, j, TimeUnit.MILLISECONDS);
        }
    }

    public void append(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        try {
            getSubpartitionMemoryDataManager(i).append(byteBuffer, dataType);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public HsDataView registerNewConsumer(int i, HsConsumerId hsConsumerId, HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations) {
        Preconditions.checkState(this.subpartitionViewOperationsMap.get(i).put(hsConsumerId, hsSubpartitionConsumerInternalOperations) == null, "Each subpartition view should have unique consumerId.");
        return getSubpartitionMemoryDataManager(i).registerNewConsumer(hsConsumerId);
    }

    public void close() {
        spillAndReleaseAllData();
        this.spiller.close();
        this.poolSizeChecker.shutdown();
    }

    private void spillAndReleaseAllData() {
        handleDecision(Optional.of((HsSpillingStrategy.Decision) callWithLock(() -> {
            return this.spillStrategy.onResultPartitionClosed(this);
        })));
    }

    public void setOutputMetrics(HsOutputMetrics hsOutputMetrics) {
        for (int i = 0; i < this.numSubpartitions; i++) {
            getSubpartitionMemoryDataManager(i).setOutputMetrics(hsOutputMetrics);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getPoolSize() {
        return this.poolSize.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumSubpartitions() {
        return this.numSubpartitions;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumTotalRequestedBuffers() {
        return this.numRequestedBuffers.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public int getNumTotalUnSpillBuffers() {
        return this.numUnSpillBuffers.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public Deque<BufferIndexAndChannel> getBuffersInOrder(int i, HsSpillingInfoProvider.SpillStatus spillStatus, HsSpillingInfoProvider.ConsumeStatusWithId consumeStatusWithId) {
        return getSubpartitionMemoryDataManager(i).getBuffersSatisfyStatus(spillStatus, consumeStatusWithId);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider
    public List<Integer> getNextBufferIndexToConsume(HsConsumerId hsConsumerId) {
        ArrayList arrayList = new ArrayList(this.numSubpartitions);
        for (int i = 0; i < this.numSubpartitions; i++) {
            HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations = this.subpartitionViewOperationsMap.get(i).get(hsConsumerId);
            arrayList.add(Integer.valueOf(hsSubpartitionConsumerInternalOperations == null ? -1 : hsSubpartitionConsumerInternalOperations.getConsumingOffset(false) + 1));
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void markBufferReleasedFromFile(int i, int i2) {
        this.fileDataIndex.markBufferReleased(i, i2);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public BufferBuilder requestBufferFromPool() throws InterruptedException {
        MemorySegment requestMemorySegmentBlocking = this.bufferPool.requestMemorySegmentBlocking();
        handleDecision(this.spillStrategy.onMemoryUsageChanged(this.numRequestedBuffers.incrementAndGet(), getPoolSize()));
        return new BufferBuilder(requestMemorySegmentBlocking, this::recycleBuffer);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onBufferConsumed(BufferIndexAndChannel bufferIndexAndChannel) {
        handleDecision(this.spillStrategy.onBufferConsumed(bufferIndexAndChannel));
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onBufferFinished() {
        handleDecision(this.spillStrategy.onBufferFinished(this.numUnSpillBuffers.incrementAndGet(), getPoolSize()));
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onDataAvailable(int i, Collection<HsConsumerId> collection) {
        Map<HsConsumerId, HsSubpartitionConsumerInternalOperations> map = this.subpartitionViewOperationsMap.get(i);
        collection.forEach(hsConsumerId -> {
            HsSubpartitionConsumerInternalOperations hsSubpartitionConsumerInternalOperations = (HsSubpartitionConsumerInternalOperations) map.get(hsConsumerId);
            if (hsSubpartitionConsumerInternalOperations != null) {
                hsSubpartitionConsumerInternalOperations.notifyDataAvailable();
            }
        });
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManagerOperation
    public void onConsumerReleased(int i, HsConsumerId hsConsumerId) {
        this.subpartitionViewOperationsMap.get(i).remove(hsConsumerId);
        getSubpartitionMemoryDataManager(i).releaseConsumer(hsConsumerId);
    }

    private void handleDecision(Optional<HsSpillingStrategy.Decision> optional) {
        HsSpillingStrategy.Decision orElseGet = optional.orElseGet(() -> {
            return (HsSpillingStrategy.Decision) callWithLock(() -> {
                return this.spillStrategy.decideActionWithGlobalInfo(this);
            });
        });
        if (!orElseGet.getBufferToSpill().isEmpty()) {
            spillBuffers(orElseGet.getBufferToSpill());
        }
        if (orElseGet.getBufferToRelease().isEmpty()) {
            return;
        }
        releaseBuffers(orElseGet.getBufferToRelease());
    }

    private void spillBuffers(Map<Integer, List<BufferIndexAndChannel>> map) {
        CompletableFuture completableFuture = new CompletableFuture();
        ArrayList arrayList = new ArrayList();
        map.forEach((num, list) -> {
            arrayList.addAll(getSubpartitionMemoryDataManager(num.intValue()).spillSubpartitionBuffers(list, completableFuture));
            this.numUnSpillBuffers.getAndAdd(-list.size());
        });
        FutureUtils.assertNoException(this.spiller.spillAsync(arrayList).thenAccept(list2 -> {
            this.fileDataIndex.addBuffers(list2);
            completableFuture.complete(null);
        }));
    }

    private void releaseBuffers(Map<Integer, List<BufferIndexAndChannel>> map) {
        map.forEach((num, list) -> {
            getSubpartitionMemoryDataManager(num.intValue()).releaseSubpartitionBuffers(list);
        });
    }

    private HsSubpartitionMemoryDataManager getSubpartitionMemoryDataManager(int i) {
        return this.subpartitionMemoryDataManagers[i];
    }

    private void recycleBuffer(MemorySegment memorySegment) {
        this.numRequestedBuffers.decrementAndGet();
        this.bufferPool.recycle(memorySegment);
    }

    private <T, R extends Exception> T callWithLock(SupplierWithException<T, R> supplierWithException) throws Exception {
        try {
            this.lock.lock();
            return supplierWithException.get();
        } finally {
            this.lock.unlock();
        }
    }
}
