package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.class */
class SortMergeSubpartitionReader implements ResultSubpartitionView, Comparable<SortMergeSubpartitionReader> {
    private final BufferAvailabilityListener availabilityListener;
    private final PartitionedFileReader fileReader;

    @GuardedBy("lock")
    private int dataBufferBacklog;

    @GuardedBy("lock")
    private boolean isReleased;

    @GuardedBy("lock")
    private Throwable failureCause;
    private int sequenceNumber;
    private final Object lock = new Object();
    private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();

    @GuardedBy("lock")
    private final Queue<Buffer> buffersRead = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SortMergeSubpartitionReader(BufferAvailabilityListener bufferAvailabilityListener, PartitionedFileReader partitionedFileReader) {
        this.availabilityListener = (BufferAvailabilityListener) Preconditions.checkNotNull(bufferAvailabilityListener);
        this.fileReader = (PartitionedFileReader) Preconditions.checkNotNull(partitionedFileReader);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() {
        synchronized (this.lock) {
            Buffer poll = this.buffersRead.poll();
            if (poll == null) {
                return null;
            }
            if (poll.isBuffer()) {
                this.dataBufferBacklog--;
            }
            Buffer peek = this.buffersRead.peek();
            Buffer.DataType dataType = peek == null ? Buffer.DataType.NONE : peek.getDataType();
            int i = this.dataBufferBacklog;
            int i2 = this.sequenceNumber;
            this.sequenceNumber = i2 + 1;
            return ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(poll, dataType, i, i2);
        }
    }

    private void addBuffer(Buffer buffer) {
        boolean z = false;
        boolean z2 = false;
        synchronized (this.lock) {
            if (this.isReleased) {
                z2 = true;
            } else {
                z = this.buffersRead.isEmpty();
                this.buffersRead.add(buffer);
                if (buffer.isBuffer()) {
                    this.dataBufferBacklog++;
                }
            }
        }
        if (z2) {
            buffer.recycleBuffer();
            throw new IllegalStateException("Subpartition reader has been already released.");
        }
        if (z) {
            notifyDataAvailable();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean readBuffers(Queue<MemorySegment> queue, BufferRecycler bufferRecycler) throws IOException {
        return this.fileReader.readCurrentRegion(queue, bufferRecycler, this::addBuffer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(Throwable th) {
        Preconditions.checkArgument(th != null, "Must be not null.");
        releaseInternal(th);
        notifyDataAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        this.availabilityListener.notifyDataAvailable();
    }

    @Override // java.lang.Comparable
    public int compareTo(SortMergeSubpartitionReader sortMergeSubpartitionReader) {
        int unsynchronizedGetNumberOfQueuedBuffers = unsynchronizedGetNumberOfQueuedBuffers();
        int unsynchronizedGetNumberOfQueuedBuffers2 = sortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers();
        if (unsynchronizedGetNumberOfQueuedBuffers != unsynchronizedGetNumberOfQueuedBuffers2 && (unsynchronizedGetNumberOfQueuedBuffers == 0 || unsynchronizedGetNumberOfQueuedBuffers2 == 0)) {
            return unsynchronizedGetNumberOfQueuedBuffers > unsynchronizedGetNumberOfQueuedBuffers2 ? 1 : -1;
        }
        long priority = this.fileReader.getPriority();
        long priority2 = sortMergeSubpartitionReader.fileReader.getPriority();
        if (priority == priority2) {
            return 0;
        }
        return priority > priority2 ? 1 : -1;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() {
        releaseInternal(null);
    }

    private void releaseInternal(@Nullable Throwable th) {
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            if (this.failureCause == null) {
                this.failureCause = th;
            }
            ArrayList arrayList = new ArrayList(this.buffersRead);
            this.buffersRead.clear();
            this.dataBufferBacklog = 0;
            arrayList.forEach((v0) -> {
                v0.recycleBuffer();
            });
            arrayList.clear();
            this.releaseFuture.complete(null);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        boolean z;
        synchronized (this.lock) {
            z = this.isReleased;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void resumeConsumption() {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void acknowledgeAllDataProcessed() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        Throwable th;
        synchronized (this.lock) {
            th = this.failureCause;
        }
        return th;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(int i) {
        boolean z;
        ResultSubpartitionView.AvailabilityWithBacklog availabilityWithBacklog;
        synchronized (this.lock) {
            if (this.isReleased) {
                z = true;
            } else if (this.buffersRead.isEmpty()) {
                z = false;
            } else {
                z = i > 0 || !this.buffersRead.peek().isBuffer();
            }
            availabilityWithBacklog = new ResultSubpartitionView.AvailabilityWithBacklog(z, this.dataBufferBacklog);
        }
        return availabilityWithBacklog;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return this.buffersRead.size();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int getNumberOfQueuedBuffers() {
        int size;
        synchronized (this.lock) {
            size = this.buffersRead.size();
        }
        return size;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyNewBufferSize(int i) {
    }
}
