package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerMetricUpdate;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.class */
public class TieredResultPartition extends ResultPartition {
    private final TieredStoragePartitionId partitionId;
    private final TieredStorageProducerClient tieredStorageProducerClient;
    private final TieredStorageResourceRegistry tieredStorageResourceRegistry;
    private final TieredStorageNettyServiceImpl nettyService;
    private boolean hasNotifiedEndOfUserRecords;

    public TieredResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i2, int i3, ResultPartitionManager resultPartitionManager, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> supplierWithException, TieredStorageProducerClient tieredStorageProducerClient, TieredStorageResourceRegistry tieredStorageResourceRegistry, TieredStorageNettyServiceImpl tieredStorageNettyServiceImpl) {
        super(str, i, resultPartitionID, resultPartitionType, i2, i3, resultPartitionManager, bufferCompressor, supplierWithException);
        this.partitionId = TieredStorageIdMappingUtils.convertId(resultPartitionID);
        this.tieredStorageProducerClient = tieredStorageProducerClient;
        this.tieredStorageResourceRegistry = tieredStorageResourceRegistry;
        this.nettyService = tieredStorageNettyServiceImpl;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void setupInternal() throws IOException {
        if (isReleased()) {
            throw new IOException("Result partition has been released.");
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        super.setMetricGroup(taskIOMetricGroup);
        this.tieredStorageProducerClient.setMetricStatisticsUpdater(this::updateProducerMetricStatistics);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(ByteBuffer byteBuffer, int i) throws IOException {
        this.resultPartitionBytes.inc(i, byteBuffer.remaining());
        emit(byteBuffer, i, Buffer.DataType.DATA_BUFFER, false);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastRecord(ByteBuffer byteBuffer) throws IOException {
        this.resultPartitionBytes.incAll(byteBuffer.remaining());
        broadcast(byteBuffer, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(abstractEvent, z);
        try {
            broadcast(buffer.getNioBufferReadable(), buffer.getDataType());
            buffer.recycleBuffer();
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private void broadcast(ByteBuffer byteBuffer, Buffer.DataType dataType) throws IOException {
        checkInProduceState();
        emit(byteBuffer, 0, dataType, true);
    }

    private void emit(ByteBuffer byteBuffer, int i, Buffer.DataType dataType, boolean z) throws IOException {
        this.tieredStorageProducerClient.write(byteBuffer, TieredStorageIdMappingUtils.convertId(i), dataType, z);
    }

    private void updateProducerMetricStatistics(TieredStorageProducerMetricUpdate tieredStorageProducerMetricUpdate) {
        this.numBuffersOut.inc(tieredStorageProducerMetricUpdate.numWriteBuffersDelta());
        this.numBytesOut.inc(tieredStorageProducerMetricUpdate.numWriteBytesDelta());
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public ResultSubpartitionView createSubpartitionView(int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        Preconditions.checkState(!isReleased(), "ResultPartition already released.");
        return this.nettyService.createResultSubpartitionView(this.partitionId, new TieredStorageSubpartitionId(i), bufferAvailabilityListener);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void finish() throws IOException {
        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState(!isReleased(), "Result partition is already released.");
        super.finish();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.tieredStorageProducerClient.close();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        this.tieredStorageResourceRegistry.clearResourceFor(this.partitionId);
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void notifyEndOfData(StopMode stopMode) throws IOException {
        if (this.hasNotifiedEndOfUserRecords) {
            return;
        }
        broadcastEvent(new EndOfData(stopMode), false);
        this.hasNotifiedEndOfUserRecords = true;
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void alignedBarrierTimeout(long j) throws IOException {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void abortCheckpoint(long j, CheckpointException checkpointException) {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flushAll() {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flush(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public CompletableFuture<Void> getAllDataProcessedFuture() {
        return FutureUtils.completedVoidFuture();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public void onSubpartitionAllDataProcessed(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers(int i) {
        return 0;
    }
}
