/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;

class SpilledSubpartitionViewSyncIO
implements ResultSubpartitionView {
    private final ResultSubpartition parent;
    private final BufferFileReader fileReader;
    private final SpillReadBufferPool bufferPool;
    private AtomicBoolean isReleased = new AtomicBoolean();
    private final long fileSize;

    SpilledSubpartitionViewSyncIO(ResultSubpartition parent, int memorySegmentSize, FileIOChannel.ID channelId, long initialSeekPosition) throws IOException {
        Preconditions.checkArgument((initialSeekPosition >= 0L ? 1 : 0) != 0, (Object)"Initial seek position is < 0.");
        this.parent = (ResultSubpartition)Preconditions.checkNotNull((Object)parent);
        this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
        this.fileReader = new SynchronousBufferFileReader(channelId, false);
        if (initialSeekPosition > 0L) {
            this.fileReader.seekToPosition(initialSeekPosition);
        }
        this.fileSize = this.fileReader.getSize();
    }

    @Override
    public Buffer getNextBuffer() throws IOException, InterruptedException {
        if (this.fileReader.hasReachedEndOfFile()) {
            return null;
        }
        Buffer buffer = this.bufferPool.requestBufferBlocking();
        this.fileReader.readInto(buffer);
        return buffer;
    }

    @Override
    public boolean registerListener(NotificationListener listener) throws IOException {
        return false;
    }

    @Override
    public void notifySubpartitionConsumed() throws IOException {
        this.parent.onConsumedSubpartition();
    }

    @Override
    public void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            this.fileReader.close();
            this.bufferPool.destroy();
        }
    }

    @Override
    public boolean isReleased() {
        return this.parent.isReleased() || this.isReleased.get();
    }

    @Override
    public Throwable getFailureCause() {
        return this.parent.getFailureCause();
    }

    public String toString() {
        return String.format("SpilledSubpartitionView[sync](index: %d, file size: %d bytes) of ResultPartition %s", this.parent.index, this.fileSize, this.parent.parent.getPartitionId());
    }

    private static class SpillReadBufferPool
    implements BufferRecycler {
        private final Queue<Buffer> buffers;
        private boolean isDestroyed;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
            Queue<Buffer> queue = this.buffers = new ArrayDeque<Buffer>(numberOfBuffers);
            synchronized (queue) {
                for (int i = 0; i < numberOfBuffers; ++i) {
                    this.buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)memorySegmentSize), this));
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void recycle(MemorySegment memorySegment) {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                if (this.isDestroyed) {
                    memorySegment.free();
                } else {
                    this.buffers.add(new Buffer(memorySegment, this));
                    this.buffers.notifyAll();
                }
            }
        }

        private Buffer requestBufferBlocking() throws InterruptedException {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                while (true) {
                    if (this.isDestroyed) {
                        return null;
                    }
                    Buffer buffer = this.buffers.poll();
                    if (buffer != null) {
                        return buffer;
                    }
                    this.buffers.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void destroy() {
            Queue<Buffer> queue = this.buffers;
            synchronized (queue) {
                this.isDestroyed = true;
                this.buffers.notifyAll();
            }
        }
    }
}

