package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/HashBasedDataBuffer.class */
public class HashBasedDataBuffer implements DataBuffer {
    private final LinkedList<MemorySegment> freeSegments;
    private final BufferRecycler bufferRecycler;
    private final int numGuaranteedBuffers;
    private final ArrayDeque<BufferConsumer>[] buffers;
    private final int bufferSize;
    private long numTotalBytes;
    private long numTotalRecords;
    private boolean isFinished;
    private boolean isReleased;
    private final BufferBuilder[] builders;
    private int numBuffersOccupied;
    private int readOrderIndex;
    private final int[] subpartitionReadOrder;
    private long numTotalBytesRead;

    public HashBasedDataBuffer(LinkedList<MemorySegment> linkedList, BufferRecycler bufferRecycler, int i, int i2, int i3, @Nullable int[] iArr) {
        Preconditions.checkArgument(i3 > 0, "No guaranteed buffers for sort.");
        this.freeSegments = (LinkedList) Preconditions.checkNotNull(linkedList);
        this.bufferRecycler = (BufferRecycler) Preconditions.checkNotNull(bufferRecycler);
        this.bufferSize = i2;
        this.numGuaranteedBuffers = i3;
        Preconditions.checkState(i3 <= linkedList.size(), "Wrong number of free segments.");
        this.builders = new BufferBuilder[i];
        this.buffers = new ArrayDeque[i];
        for (int i4 = 0; i4 < i; i4++) {
            this.buffers[i4] = new ArrayDeque<>();
        }
        this.subpartitionReadOrder = new int[i];
        if (iArr != null) {
            Preconditions.checkArgument(iArr.length == i, "Illegal data read order.");
            System.arraycopy(iArr, 0, this.subpartitionReadOrder, 0, i);
        } else {
            for (int i5 = 0; i5 < i; i5++) {
                this.subpartitionReadOrder[i5] = i5;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean append(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        Preconditions.checkArgument(byteBuffer.hasRemaining(), "Cannot append empty data.");
        Preconditions.checkState(!this.isFinished, "Sort buffer is already finished.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        int remaining = byteBuffer.remaining();
        if (dataType.isBuffer()) {
            writeRecord(byteBuffer, i);
        } else {
            writeEvent(byteBuffer, i, dataType);
        }
        if (byteBuffer.hasRemaining()) {
            return true;
        }
        this.numTotalRecords++;
        this.numTotalBytes += remaining - byteBuffer.remaining();
        return false;
    }

    private void writeEvent(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) {
        BufferBuilder bufferBuilder = this.builders[i];
        if (bufferBuilder != null) {
            bufferBuilder.finish();
            bufferBuilder.close();
            this.builders[i] = null;
        }
        MemorySegment allocateUnpooledOffHeapMemory = MemorySegmentFactory.allocateUnpooledOffHeapMemory(byteBuffer.remaining());
        allocateUnpooledOffHeapMemory.put(0, byteBuffer, allocateUnpooledOffHeapMemory.size());
        this.buffers[i].add(new BufferConsumer(new NetworkBuffer(allocateUnpooledOffHeapMemory, FreeingBufferRecycler.INSTANCE, dataType), allocateUnpooledOffHeapMemory.size()));
    }

    private void writeRecord(ByteBuffer byteBuffer, int i) {
        BufferBuilder bufferBuilder = this.builders[i];
        if (byteBuffer.remaining() > (bufferBuilder != null ? bufferBuilder.getWritableBytes() : 0) + ((this.numGuaranteedBuffers - this.numBuffersOccupied) * this.bufferSize)) {
            return;
        }
        do {
            if (bufferBuilder == null) {
                bufferBuilder = new BufferBuilder(this.freeSegments.poll(), this.bufferRecycler);
                this.buffers[i].add(bufferBuilder.createBufferConsumer());
                this.numBuffersOccupied++;
                this.builders[i] = bufferBuilder;
            }
            bufferBuilder.append(byteBuffer);
            if (bufferBuilder.isFull()) {
                bufferBuilder.finish();
                bufferBuilder.close();
                this.builders[i] = null;
                bufferBuilder = null;
            }
        } while (byteBuffer.hasRemaining());
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public BufferWithChannel getNextBuffer(MemorySegment memorySegment) {
        Preconditions.checkState(this.isFinished, "Sort buffer is not ready to be read.");
        Preconditions.checkState(!this.isReleased, "Sort buffer is already released.");
        BufferWithChannel bufferWithChannel = null;
        if (!hasRemaining() || this.readOrderIndex >= this.subpartitionReadOrder.length) {
            return null;
        }
        int i = this.subpartitionReadOrder[this.readOrderIndex];
        while (bufferWithChannel == null) {
            BufferConsumer poll = this.buffers[i].poll();
            if (poll != null) {
                bufferWithChannel = new BufferWithChannel(poll.build(), i);
                this.numBuffersOccupied -= bufferWithChannel.getBuffer().isBuffer() ? 1 : 0;
                this.numTotalBytesRead += bufferWithChannel.getBuffer().readableBytes();
                poll.close();
            } else {
                int i2 = this.readOrderIndex + 1;
                this.readOrderIndex = i2;
                if (i2 >= this.subpartitionReadOrder.length) {
                    break;
                }
                i = this.subpartitionReadOrder[this.readOrderIndex];
            }
        }
        return bufferWithChannel;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public long numTotalRecords() {
        return this.numTotalRecords;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public long numTotalBytes() {
        return this.numTotalBytes;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean hasRemaining() {
        return this.numTotalBytesRead < this.numTotalBytes;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public void finish() {
        Preconditions.checkState(!this.isFinished, "DataBuffer is already finished.");
        this.isFinished = true;
        for (int i = 0; i < this.builders.length; i++) {
            BufferBuilder bufferBuilder = this.builders[i];
            if (bufferBuilder != null) {
                bufferBuilder.finish();
                bufferBuilder.close();
                this.builders[i] = null;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public void release() {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        for (int i = 0; i < this.builders.length; i++) {
            BufferBuilder bufferBuilder = this.builders[i];
            if (bufferBuilder != null) {
                bufferBuilder.close();
                this.builders[i] = null;
            }
        }
        for (ArrayDeque<BufferConsumer> arrayDeque : this.buffers) {
            BufferConsumer poll = arrayDeque.poll();
            while (true) {
                BufferConsumer bufferConsumer = poll;
                if (bufferConsumer != null) {
                    bufferConsumer.close();
                    poll = arrayDeque.poll();
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.DataBuffer
    public boolean isReleased() {
        return this.isReleased;
    }
}
