package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferHeader;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/SegmentPartitionFileReader.class */
public class SegmentPartitionFileReader implements PartitionFileReader {
    private final ByteBuffer reusedHeaderBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, Tuple2<ReadableByteChannel, Integer>>> openedChannelAndSegmentIds = new HashMap();
    private final String dataFilePath;
    private FileSystem fileSystem;

    public SegmentPartitionFileReader(String str) {
        this.dataFilePath = str;
        try {
            this.fileSystem = new Path(str).getFileSystem();
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to initialize the FileSystem.");
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public PartitionFileReader.ReadBufferResult readBuffer(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2, MemorySegment memorySegment, BufferRecycler bufferRecycler, @Nullable PartitionFileReader.ReadProgress readProgress, @Nullable CompositeBuffer compositeBuffer) throws IOException {
        Map<TieredStorageSubpartitionId, Tuple2<ReadableByteChannel, Integer>> computeIfAbsent = this.openedChannelAndSegmentIds.computeIfAbsent(tieredStoragePartitionId, tieredStoragePartitionId2 -> {
            return new HashMap();
        });
        Tuple2<ReadableByteChannel, Integer> orDefault = computeIfAbsent.getOrDefault(tieredStorageSubpartitionId, Tuple2.of((Object) null, -1));
        ReadableByteChannel readableByteChannel = (ReadableByteChannel) orDefault.f0;
        if (readableByteChannel == null || ((Integer) orDefault.f1).intValue() != i) {
            if (readableByteChannel != null) {
                readableByteChannel.close();
            }
            readableByteChannel = openNewChannel(tieredStoragePartitionId, tieredStorageSubpartitionId, i);
            if (readableByteChannel == null) {
                return null;
            }
            computeIfAbsent.put(tieredStorageSubpartitionId, Tuple2.of(readableByteChannel, Integer.valueOf(i)));
        }
        this.reusedHeaderBuffer.clear();
        if (readableByteChannel.read(this.reusedHeaderBuffer) == -1) {
            readableByteChannel.close();
            this.openedChannelAndSegmentIds.get(tieredStoragePartitionId).remove(tieredStorageSubpartitionId);
            return getSingletonReadResult(new NetworkBuffer(memorySegment, bufferRecycler, Buffer.DataType.END_OF_SEGMENT));
        }
        this.reusedHeaderBuffer.flip();
        BufferHeader parseBufferHeader = BufferReaderWriterUtil.parseBufferHeader(this.reusedHeaderBuffer);
        if (readableByteChannel.read(memorySegment.wrap(0, parseBufferHeader.getLength())) == parseBufferHeader.getLength()) {
            return getSingletonReadResult(new NetworkBuffer(memorySegment, bufferRecycler, parseBufferHeader.getDataType(), parseBufferHeader.isCompressed(), parseBufferHeader.getLength()));
        }
        readableByteChannel.close();
        throw new IOException("The length of data buffer is illegal.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public long getPriority(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2, @Nullable PartitionFileReader.ReadProgress readProgress) {
        return -1L;
    }

    private ReadableByteChannel openNewChannel(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) throws IOException {
        Path segmentPath = SegmentPartitionFile.getSegmentPath(this.dataFilePath, tieredStoragePartitionId, tieredStorageSubpartitionId.getSubpartitionId(), i);
        if (this.fileSystem.exists(segmentPath)) {
            return Channels.newChannel((InputStream) this.fileSystem.open(segmentPath));
        }
        return null;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader
    public void release() {
        this.openedChannelAndSegmentIds.values().stream().map((v0) -> {
            return v0.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(tuple2 -> {
            try {
                ((ReadableByteChannel) tuple2.f0).close();
            } catch (IOException e) {
                ExceptionUtils.rethrow(e);
            }
        });
    }

    private static PartitionFileReader.ReadBufferResult getSingletonReadResult(NetworkBuffer networkBuffer) {
        return new PartitionFileReader.ReadBufferResult(Collections.singletonList(networkBuffer), false, null);
    }
}
