package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.class */
public class HsFullSpillingStrategy implements HsSpillingStrategy {
    private final int numBuffersTriggerSpilling;
    private final float releaseBufferRatio;
    private final float releaseThreshold;

    public HsFullSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        this.numBuffersTriggerSpilling = hybridShuffleConfiguration.getFullStrategyNumBuffersTriggerSpilling();
        this.releaseThreshold = hybridShuffleConfiguration.getFullStrategyReleaseThreshold();
        this.releaseBufferRatio = hybridShuffleConfiguration.getFullStrategyReleaseBufferRatio();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onBufferFinished(int i) {
        return i < this.numBuffersTriggerSpilling ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onBufferConsumed(BufferIndexAndChannel bufferIndexAndChannel) {
        return Optional.of(HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public Optional<HsSpillingStrategy.Decision> onMemoryUsageChanged(int i, int i2) {
        return ((float) i) < ((float) i2) * this.releaseThreshold ? Optional.of(HsSpillingStrategy.Decision.NO_ACTION) : Optional.empty();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public HsSpillingStrategy.Decision decideActionWithGlobalInfo(HsSpillingInfoProvider hsSpillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        checkSpill(hsSpillingInfoProvider, builder);
        checkRelease(hsSpillingInfoProvider, builder);
        return builder.build();
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy
    public HsSpillingStrategy.Decision onResultPartitionClosed(HsSpillingInfoProvider hsSpillingInfoProvider) {
        HsSpillingStrategy.Decision.Builder builder = HsSpillingStrategy.Decision.builder();
        for (int i = 0; i < hsSpillingInfoProvider.getNumSubpartitions(); i++) {
            builder.addBufferToSpill(i, hsSpillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL)).addBufferToRelease(i, hsSpillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.ALL, HsSpillingInfoProvider.ConsumeStatus.ALL));
        }
        return builder.build();
    }

    private void checkSpill(HsSpillingInfoProvider hsSpillingInfoProvider, HsSpillingStrategy.Decision.Builder builder) {
        if (hsSpillingInfoProvider.getNumTotalUnSpillBuffers() < this.numBuffersTriggerSpilling) {
            return;
        }
        for (int i = 0; i < hsSpillingInfoProvider.getNumSubpartitions(); i++) {
            builder.addBufferToSpill(i, hsSpillingInfoProvider.getBuffersInOrder(i, HsSpillingInfoProvider.SpillStatus.NOT_SPILL, HsSpillingInfoProvider.ConsumeStatus.ALL));
        }
    }

    private void checkRelease(HsSpillingInfoProvider hsSpillingInfoProvider, HsSpillingStrategy.Decision.Builder builder) {
        if (hsSpillingInfoProvider.getNumTotalRequestedBuffers() < hsSpillingInfoProvider.getPoolSize() * this.releaseThreshold) {
            return;
        }
        int poolSize = (int) (hsSpillingInfoProvider.getPoolSize() * this.releaseBufferRatio);
        TreeMap treeMap = new TreeMap();
        int i = 0;
        for (int i2 = 0; i2 < hsSpillingInfoProvider.getNumSubpartitions(); i2++) {
            Deque<BufferIndexAndChannel> buffersInOrder = hsSpillingInfoProvider.getBuffersInOrder(i2, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.CONSUMED);
            i += buffersInOrder.size();
            treeMap.put(Integer.valueOf(i2), buffersInOrder);
        }
        TreeMap treeMap2 = new TreeMap();
        if (poolSize > i) {
            TreeMap treeMap3 = new TreeMap();
            for (int i3 = 0; i3 < hsSpillingInfoProvider.getNumSubpartitions(); i3++) {
                treeMap3.put(Integer.valueOf(i3), hsSpillingInfoProvider.getBuffersInOrder(i3, HsSpillingInfoProvider.SpillStatus.SPILL, HsSpillingInfoProvider.ConsumeStatus.NOT_CONSUMED));
            }
            treeMap2.putAll(HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(hsSpillingInfoProvider.getNextBufferIndexToConsume(), treeMap3, poolSize - i));
        }
        for (int i4 = 0; i4 < hsSpillingInfoProvider.getNumSubpartitions(); i4++) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll((Collection) treeMap.getOrDefault(Integer.valueOf(i4), new ArrayDeque()));
            arrayList.addAll((Collection) treeMap2.getOrDefault(Integer.valueOf(i4), new ArrayList()));
            builder.addBufferToRelease(i4, arrayList);
        }
    }
}
