package org.apache.flink.runtime.io.network.partition.hybrid;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReaderImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.class */
public class HsResultPartition extends ResultPartition {
    public static final String DATA_FILE_SUFFIX = ".hybrid.data";
    public static final String INDEX_FILE_SUFFIX = ".hybrid.index";
    public static final int BROADCAST_CHANNEL = 0;
    private final HsFileDataIndex dataIndex;
    private final HsFileDataManager fileDataManager;
    private final Path dataFilePath;
    private final int networkBufferSize;
    private final HybridShuffleConfiguration hybridShuffleConfiguration;
    private final HsConsumerId[] lastConsumerIds;
    private boolean hasNotifiedEndOfUserRecords;

    @Nullable
    private HsMemoryDataManager memoryDataManager;
    private final boolean isBroadcastOnly;

    public HsResultPartition(String str, int i, ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, int i2, int i3, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService scheduledExecutorService, ResultPartitionManager resultPartitionManager, String str2, int i4, HybridShuffleConfiguration hybridShuffleConfiguration, @Nullable BufferCompressor bufferCompressor, boolean z, SupplierWithException<BufferPool, IOException> supplierWithException) {
        super(str, i, resultPartitionID, resultPartitionType, i2, i3, resultPartitionManager, bufferCompressor, supplierWithException);
        this.networkBufferSize = i4;
        this.dataFilePath = new File(str2 + DATA_FILE_SUFFIX).toPath();
        this.dataIndex = new HsFileDataIndexImpl(z ? 1 : i2, new File(str2 + INDEX_FILE_SUFFIX).toPath(), hybridShuffleConfiguration.getSpilledIndexSegmentSize(), hybridShuffleConfiguration.getNumRetainedInMemoryRegionsMax());
        this.hybridShuffleConfiguration = hybridShuffleConfiguration;
        this.isBroadcastOnly = z;
        this.fileDataManager = new HsFileDataManager(batchShuffleReadBufferPool, scheduledExecutorService, this.dataIndex, this.dataFilePath, HsSubpartitionFileReaderImpl.Factory.INSTANCE, hybridShuffleConfiguration);
        this.lastConsumerIds = new HsConsumerId[i2];
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void setupInternal() throws IOException {
        if (isReleased()) {
            throw new IOException("Result partition has been released.");
        }
        this.fileDataManager.setup();
        this.memoryDataManager = new HsMemoryDataManager(this.isBroadcastOnly ? 1 : this.numSubpartitions, this.networkBufferSize, this.bufferPool, getSpillingStrategy(this.hybridShuffleConfiguration), this.dataIndex, this.dataFilePath, this.bufferCompressor, this.hybridShuffleConfiguration.getBufferPoolSizeCheckIntervalMs());
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        super.setMetricGroup(taskIOMetricGroup);
        ((HsMemoryDataManager) Preconditions.checkNotNull(this.memoryDataManager)).setOutputMetrics(new HsOutputMetrics(this.numBytesOut, this.numBuffersOut));
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void emitRecord(ByteBuffer byteBuffer, int i) throws IOException {
        this.resultPartitionBytes.inc(i, byteBuffer.remaining());
        emit(byteBuffer, i, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastRecord(ByteBuffer byteBuffer) throws IOException {
        broadcast(byteBuffer, Buffer.DataType.DATA_BUFFER);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(abstractEvent, z);
        try {
            broadcast(buffer.getNioBufferReadable(), buffer.getDataType());
            buffer.recycleBuffer();
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private void broadcast(ByteBuffer byteBuffer, Buffer.DataType dataType) throws IOException {
        this.resultPartitionBytes.incAll(byteBuffer.remaining());
        if (this.isBroadcastOnly) {
            emit(byteBuffer, 0, dataType);
            return;
        }
        for (int i = 0; i < this.numSubpartitions; i++) {
            emit(byteBuffer.duplicate(), i, dataType);
        }
    }

    private void emit(ByteBuffer byteBuffer, int i, Buffer.DataType dataType) throws IOException {
        checkInProduceState();
        ((HsMemoryDataManager) Preconditions.checkNotNull(this.memoryDataManager)).append(byteBuffer, i, dataType);
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public ResultSubpartitionView createSubpartitionView(int i, BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        Preconditions.checkState(!isReleased(), "ResultPartition already released.");
        if (!Files.isReadable(this.dataFilePath)) {
            throw new PartitionNotFoundException(getPartitionId());
        }
        int i2 = this.isBroadcastOnly ? 0 : i;
        HsSubpartitionConsumer hsSubpartitionConsumer = new HsSubpartitionConsumer(bufferAvailabilityListener);
        HsConsumerId hsConsumerId = this.lastConsumerIds[i2];
        checkMultipleConsumerIsAllowed(hsConsumerId, this.hybridShuffleConfiguration);
        HsConsumerId newId = HsConsumerId.newId(hsConsumerId);
        this.lastConsumerIds[i2] = newId;
        HsDataView registerNewConsumer = this.fileDataManager.registerNewConsumer(i2, newId, hsSubpartitionConsumer);
        HsDataView registerNewConsumer2 = ((HsMemoryDataManager) Preconditions.checkNotNull(this.memoryDataManager)).registerNewConsumer(i2, newId, hsSubpartitionConsumer);
        hsSubpartitionConsumer.setDiskDataView(registerNewConsumer);
        hsSubpartitionConsumer.setMemoryDataView(registerNewConsumer2);
        return hsSubpartitionConsumer;
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void alignedBarrierTimeout(long j) throws IOException {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void abortCheckpoint(long j, CheckpointException checkpointException) {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flushAll() {
    }

    @Override // org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void flush(int i) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void finish() throws IOException {
        broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState(!isReleased(), "Result partition is already released.");
        super.finish();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter, java.lang.AutoCloseable
    public void close() {
        ((HsMemoryDataManager) Preconditions.checkNotNull(this.memoryDataManager)).close();
        super.close();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    protected void releaseInternal() {
        this.fileDataManager.release();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public long getSizeOfQueuedBuffersUnsafe() {
        return 0L;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition
    public int getNumberOfQueuedBuffers(int i) {
        return 0;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultPartition, org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter
    public void notifyEndOfData(StopMode stopMode) throws IOException {
        if (this.hasNotifiedEndOfUserRecords) {
            return;
        }
        broadcastEvent(new EndOfData(stopMode), false);
        this.hasNotifiedEndOfUserRecords = true;
    }

    private HsSpillingStrategy getSpillingStrategy(HybridShuffleConfiguration hybridShuffleConfiguration) {
        switch (hybridShuffleConfiguration.getSpillingStrategyType()) {
            case FULL:
                return new HsFullSpillingStrategy(hybridShuffleConfiguration);
            case SELECTIVE:
                return new HsSelectiveSpillingStrategy(hybridShuffleConfiguration);
            default:
                throw new IllegalConfigurationException("Illegal spilling strategy.");
        }
    }

    private void checkMultipleConsumerIsAllowed(HsConsumerId hsConsumerId, HybridShuffleConfiguration hybridShuffleConfiguration) {
        if (hybridShuffleConfiguration.getSpillingStrategyType() == HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE) {
            Preconditions.checkState(hsConsumerId == null, "Multiple consumer is not allowed for %s spilling strategy mode", hybridShuffleConfiguration.getSpillingStrategyType());
        }
    }
}
