package org.apache.flink.table.runtime.util;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.io.BinaryRowChannelInputViewIterator;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.ResettableRowBuffer;
import org.apache.flink.table.shaded.org.antlr.v4.runtime.atn.PredictionContext;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer.class */
public class ResettableExternalBuffer implements ResettableRowBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(ResettableExternalBuffer.class);
    public static final int MIN_NUM_MEMORY = 327680;
    private static final int READ_BUFFER = 2;
    private final IOManager ioManager;
    private final LazyMemorySegmentPool pool;
    private final BinaryRowDataSerializer binaryRowSerializer;
    private final InMemoryBuffer inMemoryBuffer;
    private final int segmentSize;
    private long spillSize;
    private long rowLength;
    private boolean isRowAllInFixedPart;
    private final List<ChannelWithMeta> spilledChannelIDs;
    private final List<Integer> spilledChannelRowOffsets;
    private int numRows;
    private int iterOpenedCount;
    private int externalBufferVersion;
    private boolean addCompleted;

    /* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer$BufferIterator.class */
    public class BufferIterator implements ResettableRowBuffer.ResettableIterator {
        MutableObjectIterator<BinaryRowData> currentIterator;
        List<MemorySegment> freeMemory;
        BlockChannelReader<MemorySegment> fileReader;
        int currentChannelID;
        BinaryRowData reuse;
        BinaryRowData row;
        int beginRow;
        int nextRow;
        InMemoryBuffer.InMemoryBufferIterator reusableMemoryIterator;
        int versionSnapshot;
        boolean closed;

        private BufferIterator(int i) {
            this.freeMemory = null;
            this.currentChannelID = -1;
            this.reuse = ResettableExternalBuffer.this.binaryRowSerializer.m163createInstance();
            this.beginRow = Math.min(i, ResettableExternalBuffer.this.numRows);
            this.nextRow = this.beginRow;
            this.versionSnapshot = ResettableExternalBuffer.this.externalBufferVersion;
            this.closed = false;
        }

        private void checkValidity() {
            if (this.closed) {
                throw new RuntimeException("This iterator is closed!");
            }
            if (this.versionSnapshot != ResettableExternalBuffer.this.externalBufferVersion) {
                throw new RuntimeException("This iterator is no longer valid!");
            }
        }

        @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer.ResettableIterator
        public void reset() throws IOException {
            checkValidity();
            resetImpl();
        }

        private void resetImpl() throws IOException {
            closeCurrentFileReader();
            this.nextRow = this.beginRow;
            this.currentChannelID = -1;
            this.currentIterator = null;
            this.row = null;
            this.reuse.clear();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            try {
                resetImpl();
                if (this.freeMemory != null) {
                    this.freeMemory.clear();
                }
                if (this.reusableMemoryIterator != null) {
                    this.reusableMemoryIterator.close();
                }
                this.closed = true;
                ResettableExternalBuffer.access$1110(ResettableExternalBuffer.this);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.flink.table.runtime.util.RowIterator
        public boolean advanceNext() {
            checkValidity();
            try {
                updateIteratorIfNeeded();
                do {
                    if (this.currentIterator != null) {
                        BinaryRowData binaryRowData = (BinaryRowData) this.currentIterator.next(this.reuse);
                        this.row = binaryRowData;
                        if (binaryRowData != null) {
                            this.nextRow++;
                            return true;
                        }
                    }
                } while (nextIterator());
                return false;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean nextIterator() throws IOException {
            if (this.currentChannelID == -1) {
                if (ResettableExternalBuffer.this.isRowAllInFixedPart) {
                    gotoAllInFixedPartRow(this.beginRow);
                    return true;
                }
                gotoVariableLengthRow(this.beginRow);
                return true;
            }
            if (this.currentChannelID == Integer.MAX_VALUE) {
                return false;
            }
            if (this.currentChannelID < ResettableExternalBuffer.this.spilledChannelIDs.size() - 1) {
                nextSpilledIterator();
                return true;
            }
            newMemoryIterator();
            return true;
        }

        private boolean iteratorNeedsUpdate() {
            int size = ResettableExternalBuffer.this.spilledChannelRowOffsets.size();
            return size > 0 && this.currentChannelID == Integer.MAX_VALUE && this.nextRow <= ((Integer) ResettableExternalBuffer.this.spilledChannelRowOffsets.get(size - 1)).intValue();
        }

        private void updateIteratorIfNeeded() throws IOException {
            if (iteratorNeedsUpdate()) {
                this.reuse.clear();
                this.reusableMemoryIterator = null;
                if (ResettableExternalBuffer.this.isRowAllInFixedPart) {
                    gotoAllInFixedPartRow(this.nextRow);
                } else {
                    gotoVariableLengthRow(this.nextRow);
                }
            }
        }

        @Override // org.apache.flink.table.runtime.util.RowIterator
        public BinaryRowData getRow() {
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.fileReader != null) {
                this.fileReader.close();
                this.fileReader = null;
            }
        }

        private void gotoAllInFixedPartRow(int i) throws IOException {
            int upperBound = upperBound(i, ResettableExternalBuffer.this.spilledChannelRowOffsets);
            int beginIndexInChannel = getBeginIndexInChannel(i, upperBound);
            if (i == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            long j = ResettableExternalBuffer.this.segmentSize / ResettableExternalBuffer.this.rowLength;
            long j2 = ((beginIndexInChannel / j) * ResettableExternalBuffer.this.segmentSize) + ((beginIndexInChannel % j) * ResettableExternalBuffer.this.rowLength);
            if (upperBound < ResettableExternalBuffer.this.spilledChannelRowOffsets.size()) {
                newSpilledIterator(upperBound, j2);
            } else {
                newMemoryIterator(beginIndexInChannel, j2);
            }
        }

        private void gotoVariableLengthRow(int i) throws IOException {
            int upperBound = upperBound(i, ResettableExternalBuffer.this.spilledChannelRowOffsets);
            int beginIndexInChannel = getBeginIndexInChannel(i, upperBound);
            if (i == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            if (upperBound < ResettableExternalBuffer.this.spilledChannelRowOffsets.size()) {
                newSpilledIterator(upperBound);
            } else {
                newMemoryIterator();
            }
            this.nextRow -= beginIndexInChannel;
            for (int i2 = 0; i2 < beginIndexInChannel; i2++) {
                advanceNext();
            }
        }

        private void nextSpilledIterator() throws IOException {
            newSpilledIterator(this.currentChannelID + 1);
        }

        private void newSpilledIterator(int i) throws IOException {
            newSpilledIterator(i, 0L);
        }

        private void newSpilledIterator(int i, long j) throws IOException {
            ChannelWithMeta channelWithMeta = (ChannelWithMeta) ResettableExternalBuffer.this.spilledChannelIDs.get(i);
            this.currentChannelID = i;
            closeCurrentFileReader();
            int i2 = (int) (j / ResettableExternalBuffer.this.segmentSize);
            long j2 = i2 * ResettableExternalBuffer.this.segmentSize;
            this.fileReader = ResettableExternalBuffer.this.ioManager.createBlockChannelReader(channelWithMeta.getChannel());
            if (j > 0) {
                this.fileReader.seekToPosition(j2);
            }
            this.currentIterator = new BinaryRowChannelInputViewIterator(new HeaderlessChannelReaderInputView(this.fileReader, getReadMemory(), channelWithMeta.getBlockCount() - i2, channelWithMeta.getNumBytesInLastBlock(), false, j - j2), ResettableExternalBuffer.this.binaryRowSerializer);
        }

        private void newMemoryIterator() throws IOException {
            newMemoryIterator(0, 0L);
        }

        private void newMemoryIterator(int i, long j) throws IOException {
            this.currentChannelID = PredictionContext.EMPTY_RETURN_STATE;
            closeCurrentFileReader();
            if (this.reusableMemoryIterator == null) {
                this.reusableMemoryIterator = ResettableExternalBuffer.this.inMemoryBuffer.newIterator(i, j);
            } else {
                this.reusableMemoryIterator.reset(ResettableExternalBuffer.this.inMemoryBuffer.recordCount, j);
            }
            this.currentIterator = this.reusableMemoryIterator;
        }

        private int getBeginIndexInChannel(int i, int i2) {
            return i2 > 0 ? i - ((Integer) ResettableExternalBuffer.this.spilledChannelRowOffsets.get(i2 - 1)).intValue() : i;
        }

        private List<MemorySegment> getReadMemory() {
            if (this.freeMemory == null) {
                this.freeMemory = new ArrayList();
                for (int i = 0; i < 2; i++) {
                    this.freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(ResettableExternalBuffer.this.segmentSize));
                }
            }
            return this.freeMemory;
        }

        private int upperBound(int i, List<Integer> list) {
            if (list.size() == 0) {
                return 0;
            }
            if (list.get(list.size() - 1).intValue() <= i) {
                return list.size();
            }
            int i2 = 0;
            int size = list.size() - 1;
            while (i2 < size) {
                int i3 = (i2 + size) / 2;
                if (list.get(i3).intValue() <= i) {
                    i2 = i3 + 1;
                } else {
                    size = i3;
                }
            }
            return i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer$InMemoryBuffer.class */
    public class InMemoryBuffer implements Closeable {
        private final AbstractRowDataSerializer serializer;
        private final ArrayList<MemorySegment> recordBufferSegments;
        private final SimpleCollectingOutputView recordCollector;
        private long currentDataBufferOffset;
        private int numBytesInLastBuffer;
        private int recordCount;

        /* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer$InMemoryBuffer$InMemoryBufferIterator.class */
        public class InMemoryBufferIterator implements MutableObjectIterator<BinaryRowData>, Closeable {
            private final int beginRow;
            private int nextRow;
            private RandomAccessInputView recordBuffer;
            private int expectedRecordCount;

            private InMemoryBufferIterator(int i, int i2, long j, RandomAccessInputView randomAccessInputView) {
                this.beginRow = i2;
                this.recordBuffer = randomAccessInputView;
                reset(i, j);
            }

            public void reset(int i, long j) {
                this.nextRow = this.beginRow;
                this.expectedRecordCount = i;
                this.recordBuffer.setReadPosition(j);
            }

            public BinaryRowData next(BinaryRowData binaryRowData) throws IOException {
                try {
                    if (this.expectedRecordCount != InMemoryBuffer.this.recordCount) {
                        throw new ConcurrentModificationException();
                    }
                    if (this.nextRow >= InMemoryBuffer.this.recordCount) {
                        return null;
                    }
                    this.nextRow++;
                    return (BinaryRowData) InMemoryBuffer.this.serializer.mapFromPages(binaryRowData, this.recordBuffer);
                } catch (EOFException e) {
                    return null;
                }
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public BinaryRowData m197next() throws IOException {
                throw new RuntimeException("Not support!");
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
            }
        }

        private InMemoryBuffer(AbstractRowDataSerializer abstractRowDataSerializer) {
            this.serializer = (AbstractRowDataSerializer) abstractRowDataSerializer.duplicate();
            this.recordBufferSegments = new ArrayList<>();
            this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments, ResettableExternalBuffer.this.pool, ResettableExternalBuffer.this.segmentSize);
            this.recordCount = 0;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset() {
            this.currentDataBufferOffset = 0L;
            this.recordCount = 0;
            returnToSegmentPool();
            this.recordCollector.reset();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            returnToSegmentPool();
        }

        private void returnToSegmentPool() {
            ResettableExternalBuffer.this.pool.returnAll(this.recordBufferSegments);
            this.recordBufferSegments.clear();
        }

        public boolean write(RowData rowData) throws IOException {
            try {
                this.serializer.serializeToPages(rowData, this.recordCollector);
                this.currentDataBufferOffset = this.recordCollector.getCurrentOffset();
                this.numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment();
                this.recordCount++;
                return true;
            } catch (EOFException e) {
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ArrayList<MemorySegment> getRecordBufferSegments() {
            return this.recordBufferSegments;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getCurrentDataBufferOffset() {
            return this.currentDataBufferOffset;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumRecordBuffers() {
            int i = (int) (this.currentDataBufferOffset / ResettableExternalBuffer.this.segmentSize);
            if (this.currentDataBufferOffset % ResettableExternalBuffer.this.segmentSize != 0) {
                i++;
            }
            return i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumBytesInLastBuffer() {
            return this.numBytesInLastBuffer;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public InMemoryBufferIterator newIterator(int i, long j) {
            Preconditions.checkArgument(j >= 0, "`offset` can't be negative!");
            return new InMemoryBufferIterator(this.recordCount, i, j, new RandomAccessInputView(this.recordBufferSegments, ResettableExternalBuffer.this.segmentSize, this.numBytesInLastBuffer));
        }
    }

    public ResettableExternalBuffer(IOManager iOManager, LazyMemorySegmentPool lazyMemorySegmentPool, AbstractRowDataSerializer abstractRowDataSerializer, boolean z) {
        this.ioManager = iOManager;
        this.pool = lazyMemorySegmentPool;
        this.binaryRowSerializer = abstractRowDataSerializer instanceof BinaryRowDataSerializer ? (BinaryRowDataSerializer) abstractRowDataSerializer.duplicate() : new BinaryRowDataSerializer(abstractRowDataSerializer.getArity());
        this.segmentSize = lazyMemorySegmentPool.pageSize();
        this.spilledChannelIDs = new ArrayList();
        this.spillSize = 0L;
        this.spilledChannelRowOffsets = new ArrayList();
        this.numRows = 0;
        this.iterOpenedCount = 0;
        this.externalBufferVersion = 0;
        this.isRowAllInFixedPart = z;
        this.rowLength = z ? this.binaryRowSerializer.getSerializedRowFixedPartLength() : -1L;
        this.addCompleted = false;
        this.inMemoryBuffer = new InMemoryBuffer(abstractRowDataSerializer);
    }

    @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer
    public void reset() {
        clearChannels();
        this.inMemoryBuffer.reset();
        this.numRows = 0;
        this.externalBufferVersion++;
        this.addCompleted = false;
    }

    @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer
    public void add(RowData rowData) throws IOException {
        Preconditions.checkState(!this.addCompleted, "This buffer has add completed.");
        if (!this.inMemoryBuffer.write(rowData)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
                throwTooBigException(rowData);
            }
            spill();
            if (!this.inMemoryBuffer.write(rowData)) {
                throwTooBigException(rowData);
            }
        }
        this.numRows++;
    }

    @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer
    public void complete() {
        this.addCompleted = true;
    }

    @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer
    public BufferIterator newIterator() {
        return newIterator(0);
    }

    @Override // org.apache.flink.table.runtime.util.ResettableRowBuffer
    public BufferIterator newIterator(int i) {
        Preconditions.checkState(this.addCompleted, "This buffer has not add completed.");
        Preconditions.checkArgument(i >= 0, "`beginRow` can't be negative!");
        this.iterOpenedCount++;
        return new BufferIterator(i);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        clearChannels();
        this.inMemoryBuffer.close();
        this.pool.close();
    }

    private void throwTooBigException(RowData rowData) throws IOException {
        throw new IOException("Record is too big, it can't be added to a empty InMemoryBuffer! Record size: " + InstantiationUtil.serializeToByteArray(this.inMemoryBuffer.serializer, rowData).length + ", Buffer: " + memorySize());
    }

    private void spill() throws IOException {
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList recordBufferSegments = this.inMemoryBuffer.getRecordBufferSegments();
        for (int i = 0; i < numRecordBuffers; i++) {
            try {
                createBlockChannelWriter.writeBlock(recordBufferSegments.get(i));
            } catch (IOException e) {
                createBlockChannelWriter.closeAndDelete();
                throw e;
            }
        }
        LOG.info("here spill the reset buffer data with {} bytes", Long.valueOf(createBlockChannelWriter.getSize()));
        createBlockChannelWriter.close();
        this.spillSize += numRecordBuffers * this.segmentSize;
        this.spilledChannelIDs.add(new ChannelWithMeta(createChannel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.spilledChannelRowOffsets.add(Integer.valueOf(this.numRows));
        this.inMemoryBuffer.reset();
    }

    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.pool.freePages() * this.segmentSize;
    }

    public long getUsedMemoryInBytes() {
        return memorySize() + (this.iterOpenedCount * 2 * this.segmentSize);
    }

    public int getNumSpillFiles() {
        return this.spilledChannelIDs.size();
    }

    public long getSpillInBytes() {
        return this.spillSize;
    }

    private void clearChannels() {
        Iterator<ChannelWithMeta> it = this.spilledChannelIDs.iterator();
        while (it.hasNext()) {
            File file = new File(it.next().getChannel().getPath());
            if (file.exists()) {
                file.delete();
            }
        }
        this.spilledChannelIDs.clear();
        this.spillSize = 0L;
        this.spilledChannelRowOffsets.clear();
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.spilledChannelIDs;
    }

    static /* synthetic */ int access$1110(ResettableExternalBuffer resettableExternalBuffer) {
        int i = resettableExternalBuffer.iterOpenedCount;
        resettableExternalBuffer.iterOpenedCount = i - 1;
        return i;
    }
}
