package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.class */
public class RemoteInputChannel extends InputChannel {
    private static final Logger LOG;
    private static final int NONE = -1;
    private final InputChannelID id;
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final PrioritizedDeque<SequenceBuffer> receivedBuffers;
    private final AtomicBoolean isReleased;
    private volatile PartitionRequestClient partitionRequestClient;
    private int expectedSequenceNumber;
    private final int initialCredit;
    private final AtomicInteger unannouncedCredit;
    private final BufferManager bufferManager;

    @GuardedBy("receivedBuffers")
    private int lastBarrierSequenceNumber;

    @GuardedBy("receivedBuffers")
    private long lastBarrierId;
    private final ChannelStatePersister channelStatePersister;
    private long totalQueueSizeInBytes;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel$BufferReorderingException.class */
    private static class BufferReorderingException extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        BufferReorderingException(int i, int i2) {
            this.expectedSequenceNumber = i;
            this.actualSequenceNumber = i2;
        }

        @Override // java.lang.Throwable
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", Integer.valueOf(this.expectedSequenceNumber), Integer.valueOf(this.actualSequenceNumber));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel$SequenceBuffer.class */
    public static final class SequenceBuffer {
        final Buffer buffer;
        final int sequenceNumber;

        private SequenceBuffer(Buffer buffer, int i) {
            this.buffer = buffer;
            this.sequenceNumber = i;
        }

        public String toString() {
            Object[] objArr = new Object[3];
            objArr[0] = Boolean.valueOf(!this.buffer.isBuffer());
            objArr[1] = this.buffer.getDataType();
            objArr[2] = Integer.valueOf(this.sequenceNumber);
            return String.format("SequenceBuffer(isEvent = %s, dataType = %s, sequenceNumber = %s)", objArr);
        }
    }

    public RemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, int i2, ConnectionID connectionID, ConnectionManager connectionManager, int i3, int i4, int i5, Counter counter, Counter counter2, ChannelStateWriter channelStateWriter) {
        super(singleInputGate, i, resultPartitionID, i2, i3, i4, counter, counter2);
        this.id = new InputChannelID();
        this.receivedBuffers = new PrioritizedDeque<>();
        this.isReleased = new AtomicBoolean();
        this.expectedSequenceNumber = 0;
        this.unannouncedCredit = new AtomicInteger(0);
        this.lastBarrierSequenceNumber = -1;
        this.lastBarrierId = -1L;
        Preconditions.checkArgument(i5 >= 0, "Must be non-negative.");
        this.initialCredit = i5;
        this.connectionId = (ConnectionID) Preconditions.checkNotNull(connectionID);
        this.connectionManager = (ConnectionManager) Preconditions.checkNotNull(connectionManager);
        this.bufferManager = new BufferManager(singleInputGate.getMemorySegmentProvider(), this, 0);
        this.channelStatePersister = new ChannelStatePersister(channelStateWriter, getChannelInfo());
    }

    @VisibleForTesting
    void setExpectedSequenceNumber(int i) {
        this.expectedSequenceNumber = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void setup() throws IOException {
        Preconditions.checkState(this.bufferManager.unsynchronizedGetAvailableExclusiveBuffers() == 0, "Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");
        this.bufferManager.requestExclusiveBuffers(this.initialCredit);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    @VisibleForTesting
    public void requestSubpartition() throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}. {}", new Object[]{this, Integer.valueOf(this.consumedSubpartitionIndex), this.partitionId, this.channelStatePersister});
            try {
                this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
                this.partitionRequestClient.requestSubpartition(this.partitionId, this.consumedSubpartitionIndex, this, 0);
            } catch (IOException e) {
                throw new PartitionConnectionException(this.partitionId, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retriggerSubpartitionRequest() throws IOException {
        checkPartitionRequestQueueInitialized();
        if (increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, this.consumedSubpartitionIndex, this, getCurrentBackoff());
        } else {
            failPartitionRequest();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        SequenceBuffer poll;
        Buffer.DataType dataType;
        checkPartitionRequestQueueInitialized();
        synchronized (this.receivedBuffers) {
            poll = this.receivedBuffers.poll();
            if (poll != null) {
                this.totalQueueSizeInBytes -= poll.buffer.getSize();
            }
            dataType = this.receivedBuffers.peek() != null ? this.receivedBuffers.peek().buffer.getDataType() : Buffer.DataType.NONE;
        }
        if (poll == null) {
            if (this.isReleased.get()) {
                throw new CancelTaskException("Queried for a buffer after channel has been released.");
            }
            return Optional.empty();
        }
        NetworkActionsLogger.traceInput("RemoteInputChannel#getNextBuffer", poll.buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, poll.sequenceNumber);
        this.numBytesIn.inc(poll.buffer.getSize());
        this.numBuffersIn.inc();
        return Optional.of(new InputChannel.BufferAndAvailability(poll.buffer, dataType, 0, poll.sequenceNumber));
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Tried to send task event to producer after channel has been released.");
        checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, taskEvent, this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public boolean isReleased() {
        return this.isReleased.get();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    void releaseAllResources() throws IOException {
        ArrayDeque<Buffer> arrayDeque;
        if (this.isReleased.compareAndSet(false, true)) {
            synchronized (this.receivedBuffers) {
                arrayDeque = (ArrayDeque) this.receivedBuffers.stream().map(sequenceBuffer -> {
                    return sequenceBuffer.buffer;
                }).collect(Collectors.toCollection(ArrayDeque::new));
                this.receivedBuffers.clear();
            }
            this.bufferManager.releaseAllBuffers(arrayDeque);
            if (this.partitionRequestClient != null) {
                this.partitionRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    int getBuffersInUseCount() {
        return getNumberOfQueuedBuffers() + Math.max(0, this.bufferManager.getNumberOfRequiredBuffers() - this.initialCredit);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    void announceBufferSize(int i) {
        try {
            notifyNewBufferSize(i);
        } catch (Throwable th) {
            ExceptionUtils.rethrow(th);
        }
    }

    private void failPartitionRequest() {
        setError(new PartitionNotFoundException(this.partitionId));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + "]";
    }

    private void notifyCreditAvailable() throws IOException {
        checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.notifyCreditAvailable(this);
    }

    private void notifyNewBufferSize(int i) throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.notifyNewBufferSize(this, i);
    }

    @VisibleForTesting
    public int getNumberOfAvailableBuffers() {
        return this.bufferManager.getNumberOfAvailableBuffers();
    }

    @VisibleForTesting
    public int getNumberOfRequiredBuffers() {
        return this.bufferManager.unsynchronizedGetNumberOfRequiredBuffers();
    }

    @VisibleForTesting
    public int getSenderBacklog() {
        return getNumberOfRequiredBuffers() - this.initialCredit;
    }

    @VisibleForTesting
    boolean isWaitingForFloatingBuffers() {
        return this.bufferManager.unsynchronizedIsWaitingForFloatingBuffers();
    }

    @VisibleForTesting
    public Buffer getNextReceivedBuffer() {
        SequenceBuffer poll = this.receivedBuffers.poll();
        if (poll != null) {
            return poll.buffer;
        }
        return null;
    }

    @VisibleForTesting
    BufferManager getBufferManager() {
        return this.bufferManager;
    }

    @VisibleForTesting
    PartitionRequestClient getPartitionRequestClient() {
        return this.partitionRequestClient;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void notifyBufferAvailable(int i) throws IOException {
        if (i <= 0 || this.unannouncedCredit.getAndAdd(i) != 0) {
            return;
        }
        notifyCreditAvailable();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void resumeConsumption() throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        checkPartitionRequestQueueInitialized();
        if (this.initialCredit == 0) {
            this.unannouncedCredit.set(0);
        }
        this.partitionRequestClient.resumeConsumption(this);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void acknowledgeAllRecordsProcessed() throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.acknowledgeAllRecordsProcessed(this);
    }

    private void onBlockingUpstream() {
        if (this.initialCredit == 0) {
            this.bufferManager.releaseFloatingBuffers();
        }
    }

    public int getUnannouncedCredit() {
        return this.unannouncedCredit.get();
    }

    public int getAndResetUnannouncedCredit() {
        return this.unannouncedCredit.getAndSet(0);
    }

    public int getNumberOfQueuedBuffers() {
        int size;
        synchronized (this.receivedBuffers) {
            size = this.receivedBuffers.size();
        }
        return size;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(0, this.receivedBuffers.size());
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public long unsynchronizedGetSizeOfQueuedBuffers() {
        return Math.max(0L, this.totalQueueSizeInBytes);
    }

    public int unsynchronizedGetExclusiveBuffersUsed() {
        return Math.max(0, this.initialCredit - this.bufferManager.unsynchronizedGetAvailableExclusiveBuffers());
    }

    public int unsynchronizedGetFloatingBuffersAvailable() {
        return Math.max(0, this.bufferManager.unsynchronizedGetFloatingBuffersAvailable());
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public int getInitialCredit() {
        return this.initialCredit;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    @Nullable
    public Buffer requestBuffer() {
        return this.bufferManager.requestBuffer();
    }

    public void onSenderBacklog(int i) throws IOException {
        notifyBufferAvailable(this.bufferManager.requestFloatingBuffers(i + this.initialCredit));
    }

    public void onBuffer(Buffer buffer, int i, int i2) throws IOException {
        boolean z;
        try {
            if (this.expectedSequenceNumber != i) {
                onError(new BufferReorderingException(this.expectedSequenceNumber, i));
                if (1 != 0) {
                    buffer.recycleBuffer();
                    return;
                }
                return;
            }
            if (buffer.getDataType().isBlockingUpstream()) {
                onBlockingUpstream();
                Preconditions.checkArgument(i2 == 0, "Illegal number of backlog: %s, should be 0.", new Object[]{Integer.valueOf(i2)});
            }
            boolean z2 = false;
            synchronized (this.receivedBuffers) {
                NetworkActionsLogger.traceInput("RemoteInputChannel#onBuffer", buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, i);
                if (this.isReleased.get()) {
                    if (r0) {
                        return;
                    } else {
                        return;
                    }
                }
                boolean isEmpty = this.receivedBuffers.isEmpty();
                SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, i);
                Buffer.DataType dataType = buffer.getDataType();
                if (dataType.hasPriority()) {
                    z2 = addPriorityBuffer(sequenceBuffer);
                    z = false;
                } else {
                    this.receivedBuffers.add(sequenceBuffer);
                    z = false;
                    if (dataType.requiresAnnouncement()) {
                        z2 = addPriorityBuffer(announce(sequenceBuffer));
                    }
                }
                this.totalQueueSizeInBytes += buffer.getSize();
                OptionalLong checkForBarrier = this.channelStatePersister.checkForBarrier(sequenceBuffer.buffer);
                if (checkForBarrier.isPresent() && checkForBarrier.getAsLong() > this.lastBarrierId) {
                    this.lastBarrierId = checkForBarrier.getAsLong();
                    this.lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber;
                }
                this.channelStatePersister.maybePersist(buffer);
                this.expectedSequenceNumber++;
                if (z2) {
                    notifyPriorityEvent(i);
                }
                if (isEmpty) {
                    notifyChannelNonEmpty();
                }
                if (i2 >= 0) {
                    onSenderBacklog(i2);
                }
                if (z) {
                    buffer.recycleBuffer();
                }
            }
        } finally {
            if (1 != 0) {
                buffer.recycleBuffer();
            }
        }
    }

    private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) {
        this.receivedBuffers.addPriorityElement(sequenceBuffer);
        return this.receivedBuffers.getNumPriorityElements() == 1;
    }

    private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOException {
        Preconditions.checkState(!sequenceBuffer.buffer.isBuffer(), "Only a CheckpointBarrier can be announced but found %s", new Object[]{sequenceBuffer.buffer});
        checkAnnouncedOnlyOnce(sequenceBuffer);
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(sequenceBuffer.buffer, getClass().getClassLoader());
        Preconditions.checkState(fromBuffer instanceof CheckpointBarrier, "Only a CheckpointBarrier can be announced but found %s", new Object[]{sequenceBuffer.buffer});
        return new SequenceBuffer(EventSerializer.toBuffer(new EventAnnouncement((CheckpointBarrier) fromBuffer, sequenceBuffer.sequenceNumber), true), sequenceBuffer.sequenceNumber);
    }

    private void checkAnnouncedOnlyOnce(SequenceBuffer sequenceBuffer) {
        Iterator<SequenceBuffer> it = this.receivedBuffers.iterator();
        int i = 0;
        while (it.hasNext()) {
            if (it.next().sequenceNumber == sequenceBuffer.sequenceNumber) {
                i++;
            }
        }
        Preconditions.checkState(i == 1, "Before enqueuing the announcement there should be exactly single occurrence of the buffer, but found [%d]", new Object[]{Integer.valueOf(i)});
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStarted(CheckpointBarrier checkpointBarrier) throws CheckpointException {
        synchronized (this.receivedBuffers) {
            if (checkpointBarrier.getId() < this.lastBarrierId) {
                throw new CheckpointException(String.format("Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)", Long.valueOf(checkpointBarrier.getId()), Long.valueOf(this.lastBarrierId)), CheckpointFailureReason.CHECKPOINT_SUBSUMED);
            }
            if (checkpointBarrier.getId() > this.lastBarrierId) {
                resetLastBarrier();
            }
            this.channelStatePersister.startPersisting(checkpointBarrier.getId(), getInflightBuffersUnsafe(checkpointBarrier.getId()));
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStopped(long j) {
        synchronized (this.receivedBuffers) {
            this.channelStatePersister.stopPersisting(j);
            if (this.lastBarrierId == j) {
                resetLastBarrier();
            }
        }
    }

    @VisibleForTesting
    List<Buffer> getInflightBuffers(long j) {
        List<Buffer> inflightBuffersUnsafe;
        synchronized (this.receivedBuffers) {
            inflightBuffersUnsafe = getInflightBuffersUnsafe(j);
        }
        return inflightBuffersUnsafe;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void convertToPriorityEvent(int i) throws IOException {
        boolean addPriorityBuffer;
        synchronized (this.receivedBuffers) {
            Preconditions.checkState(this.channelStatePersister.hasBarrierReceived());
            int numPriorityElements = this.receivedBuffers.getNumPriorityElements();
            SequenceBuffer andRemove = this.receivedBuffers.getAndRemove(sequenceBuffer -> {
                return sequenceBuffer.sequenceNumber == i;
            });
            Preconditions.checkState(this.lastBarrierSequenceNumber == i);
            Preconditions.checkState(!andRemove.buffer.isBuffer());
            Preconditions.checkState(numPriorityElements == this.receivedBuffers.getNumPriorityElements(), "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", new Object[]{andRemove, Integer.valueOf(numPriorityElements)});
            AbstractEvent fromBuffer = EventSerializer.fromBuffer(andRemove.buffer, getClass().getClassLoader());
            andRemove.buffer.setReaderIndex(0);
            addPriorityBuffer = addPriorityBuffer(new SequenceBuffer(EventSerializer.toBuffer(fromBuffer, true), andRemove.sequenceNumber));
        }
        if (addPriorityBuffer) {
            notifyPriorityEventForce();
        }
    }

    private void notifyPriorityEventForce() {
        this.inputGate.notifyPriorityEventForce(this);
    }

    private List<Buffer> getInflightBuffersUnsafe(long j) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.receivedBuffers)) {
            throw new AssertionError();
        }
        Preconditions.checkState(j == this.lastBarrierId || this.lastBarrierId == -1);
        ArrayList arrayList = new ArrayList();
        Iterator<SequenceBuffer> it = this.receivedBuffers.iterator();
        Iterators.advance(it, this.receivedBuffers.getNumPriorityElements());
        while (it.hasNext()) {
            SequenceBuffer next = it.next();
            if (next.buffer.isBuffer()) {
                if (!shouldBeSpilled(next.sequenceNumber)) {
                    break;
                }
                arrayList.add(next.buffer.retainBuffer());
            }
        }
        return arrayList;
    }

    private void resetLastBarrier() {
        this.lastBarrierId = -1L;
        this.lastBarrierSequenceNumber = -1;
    }

    private boolean shouldBeSpilled(int i) {
        if (this.lastBarrierSequenceNumber == -1) {
            return true;
        }
        Preconditions.checkState(this.receivedBuffers.size() < 1073741823, "Too many buffers for sequenceNumber overflow detection code to work correctly");
        return 1073741823 < this.lastBarrierSequenceNumber ? i < this.lastBarrierSequenceNumber && i > 0 : this.lastBarrierSequenceNumber < -1073741823 ? i < this.lastBarrierSequenceNumber || i > 0 : i < this.lastBarrierSequenceNumber;
    }

    public void onEmptyBuffer(int i, int i2) throws IOException {
        boolean z = false;
        synchronized (this.receivedBuffers) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == i) {
                    this.expectedSequenceNumber++;
                    z = true;
                } else {
                    onError(new BufferReorderingException(this.expectedSequenceNumber, i));
                }
            }
        }
        if (!z || i2 < 0) {
            return;
        }
        onSenderBacklog(i2);
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId, this.consumedSubpartitionIndex);
    }

    public void onError(Throwable th) {
        setError(th);
    }

    private void checkPartitionRequestQueueInitialized() throws IOException {
        checkError();
        Preconditions.checkState(this.partitionRequestClient != null, "Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
    }

    static {
        $assertionsDisabled = !RemoteInputChannel.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
    }
}
