package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.class */
public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener {
    private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
    private final Object requestLock;
    private final ResultPartitionManager partitionManager;
    private final TaskEventPublisher taskEventPublisher;

    @Nullable
    private volatile ResultSubpartitionView subpartitionView;
    private volatile boolean isReleased;
    private final ChannelStatePersister channelStatePersister;

    public LocalInputChannel(SingleInputGate singleInputGate, int i, ResultPartitionID resultPartitionID, int i2, ResultPartitionManager resultPartitionManager, TaskEventPublisher taskEventPublisher, int i3, int i4, Counter counter, Counter counter2, ChannelStateWriter channelStateWriter) {
        super(singleInputGate, i, resultPartitionID, i2, i3, i4, counter, counter2);
        this.requestLock = new Object();
        this.partitionManager = (ResultPartitionManager) Preconditions.checkNotNull(resultPartitionManager);
        this.taskEventPublisher = (TaskEventPublisher) Preconditions.checkNotNull(taskEventPublisher);
        this.channelStatePersister = new ChannelStatePersister(channelStateWriter, getChannelInfo());
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStarted(CheckpointBarrier checkpointBarrier) throws CheckpointException {
        this.channelStatePersister.startPersisting(checkpointBarrier.getId(), Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void checkpointStopped(long j) {
        this.channelStatePersister.stopPersisting(j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void requestSubpartition() throws IOException {
        boolean z = false;
        boolean z2 = false;
        synchronized (this.requestLock) {
            Preconditions.checkState(!this.isReleased, "LocalInputChannel has been released already");
            if (this.subpartitionView == null) {
                LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}. {}", new Object[]{this, Integer.valueOf(this.consumedSubpartitionIndex), this.partitionId, this.channelStatePersister});
                try {
                    ResultSubpartitionView createSubpartitionView = this.partitionManager.createSubpartitionView(this.partitionId, this.consumedSubpartitionIndex, this);
                    if (createSubpartitionView == null) {
                        throw new IOException("Error requesting subpartition.");
                    }
                    this.subpartitionView = createSubpartitionView;
                    if (this.isReleased) {
                        createSubpartitionView.releaseAllResources();
                        this.subpartitionView = null;
                    } else {
                        z2 = true;
                    }
                } catch (PartitionNotFoundException e) {
                    if (!increaseBackoff()) {
                        throw e;
                    }
                    z = true;
                }
            }
        }
        if (z2) {
            notifyDataAvailable();
        }
        if (z) {
            this.inputGate.retriggerPartitionRequest(this.partitionId.getPartitionId(), this.consumedSubpartitionIndex);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void retriggerSubpartitionRequest(Timer timer) {
        synchronized (this.requestLock) {
            Preconditions.checkState(this.subpartitionView == null, "already requested partition");
            timer.schedule(new TimerTask() { // from class: org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        LocalInputChannel.this.requestSubpartition();
                    } catch (Throwable th) {
                        LocalInputChannel.this.setError(th);
                    }
                }
            }, getCurrentBackoff());
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        checkError();
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView == null) {
            if (this.isReleased) {
                return Optional.empty();
            }
            resultSubpartitionView = checkAndWaitForSubpartitionView();
        }
        ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
        while (nextBuffer != null && nextBuffer.buffer().readableBytes() == 0) {
            nextBuffer.buffer().recycleBuffer();
            nextBuffer = resultSubpartitionView.getNextBuffer();
            this.numBuffersIn.inc();
        }
        if (nextBuffer == null) {
            if (resultSubpartitionView.isReleased()) {
                throw new CancelTaskException("Consumed partition " + resultSubpartitionView + " has been released.");
            }
            return Optional.empty();
        }
        Buffer buffer = nextBuffer.buffer();
        if (buffer instanceof FileRegionBuffer) {
            buffer = ((FileRegionBuffer) buffer).readInto(this.inputGate.getUnpooledSegment());
        }
        if (buffer instanceof CompositeBuffer) {
            buffer = ((CompositeBuffer) buffer).getFullBufferData(this.inputGate.getUnpooledSegment());
        }
        this.numBytesIn.inc(buffer.readableBytes());
        this.numBuffersIn.inc();
        this.channelStatePersister.checkForBarrier(buffer);
        this.channelStatePersister.maybePersist(buffer);
        NetworkActionsLogger.traceInput("LocalInputChannel#getNextBuffer", buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, nextBuffer.getSequenceNumber());
        return Optional.of(new InputChannel.BufferAndAvailability(buffer, nextBuffer.getNextDataType(), nextBuffer.buffersInBacklog(), nextBuffer.getSequenceNumber()));
    }

    @Override // org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener
    public void notifyDataAvailable() {
        notifyChannelNonEmpty();
    }

    private ResultSubpartitionView checkAndWaitForSubpartitionView() {
        ResultSubpartitionView resultSubpartitionView;
        synchronized (this.requestLock) {
            Preconditions.checkState(!this.isReleased, "released");
            Preconditions.checkState(this.subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
            resultSubpartitionView = this.subpartitionView;
        }
        return resultSubpartitionView;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void resumeConsumption() {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        ResultSubpartitionView resultSubpartitionView = (ResultSubpartitionView) Preconditions.checkNotNull(this.subpartitionView);
        resultSubpartitionView.resumeConsumption();
        if (resultSubpartitionView.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()) {
            notifyChannelNonEmpty();
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void acknowledgeAllRecordsProcessed() throws IOException {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        this.subpartitionView.acknowledgeAllDataProcessed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        checkError();
        Preconditions.checkState(this.subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
        if (!this.taskEventPublisher.publish(this.partitionId, taskEvent)) {
            throw new IOException("Error while publishing event " + taskEvent + " to producer. The producer could not be found.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public boolean isReleased() {
        return this.isReleased;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void releaseAllResources() throws IOException {
        if (this.isReleased) {
            return;
        }
        this.isReleased = true;
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView != null) {
            resultSubpartitionView.releaseAllResources();
            this.subpartitionView = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public void announceBufferSize(int i) {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView != null) {
            resultSubpartitionView.notifyNewBufferSize(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public int getBuffersInUseCount() {
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView == null) {
            return 0;
        }
        return resultSubpartitionView.getNumberOfQueuedBuffers();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputChannel
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        ResultSubpartitionView resultSubpartitionView = this.subpartitionView;
        if (resultSubpartitionView != null) {
            return resultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers();
        }
        return 0;
    }

    public String toString() {
        return "LocalInputChannel [" + this.partitionId + "]";
    }

    @VisibleForTesting
    ResultSubpartitionView getSubpartitionView() {
        return this.subpartitionView;
    }
}
