package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.class */
public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
    private final TestInputChannel inputChannel;
    private final int bufferSize;
    private MutableObjectIterator<T> inputIterator;
    private DataOutputSerializer serializer;
    private final T reuse;

    public IteratorWrappingTestSingleInputGate(int i, int i2, MutableObjectIterator<T> mutableObjectIterator, Class<T> cls) throws IOException, InterruptedException {
        super(1, i2, false);
        this.inputChannel = new TestInputChannel(this.inputGate, 0);
        this.bufferSize = i;
        this.reuse = (T) InstantiationUtil.instantiate(cls);
        wrapIterator(mutableObjectIterator);
    }

    private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> mutableObjectIterator) throws IOException, InterruptedException {
        this.inputIterator = mutableObjectIterator;
        this.serializer = new DataOutputSerializer(128);
        this.inputChannel.addBufferAndAvailability(new TestInputChannel.BufferAndAvailabilityProvider() { // from class: org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate.1
            private boolean hasData;

            {
                this.hasData = IteratorWrappingTestSingleInputGate.this.inputIterator.next(IteratorWrappingTestSingleInputGate.this.reuse) != null;
            }

            @Override // org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider
            public Optional<InputChannel.BufferAndAvailability> getBufferAvailability() throws IOException {
                if (!this.hasData) {
                    IteratorWrappingTestSingleInputGate.this.inputChannel.setReleased();
                    return Optional.of(new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE, false), Buffer.DataType.NONE, 0, 0));
                }
                ByteBuffer serializeRecord = RecordWriter.serializeRecord(IteratorWrappingTestSingleInputGate.this.serializer, IteratorWrappingTestSingleInputGate.this.reuse);
                BufferBuilder createBufferBuilder = BufferBuilderTestUtils.createBufferBuilder(IteratorWrappingTestSingleInputGate.this.bufferSize);
                BufferConsumer createBufferConsumer = createBufferBuilder.createBufferConsumer();
                createBufferBuilder.appendAndCommit(serializeRecord);
                this.hasData = IteratorWrappingTestSingleInputGate.this.inputIterator.next(IteratorWrappingTestSingleInputGate.this.reuse) != null;
                return Optional.of(new InputChannel.BufferAndAvailability(createBufferConsumer.build(), this.hasData ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER, 0, 0));
            }
        });
        this.inputGate.setInputChannels(new InputChannel[]{this.inputChannel});
        return this;
    }

    public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
        this.inputGate.notifyChannelNonEmpty(this.inputChannel);
        return this;
    }
}
