package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionReaderAvailabilityAndPriorityHelper;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionReaderTest.class */
class NettyConnectionReaderTest {
    private static final int INPUT_CHANNEL_INDEX = 0;
    private CompletableFuture<Tuple2<Integer, Boolean>> availableAndPriorityConsumer;
    private CompletableFuture<Tuple2<Integer, Integer>> prioritySequenceNumberConsumer;
    private CompletableFuture<Integer> requiredSegmentIdFuture;

    NettyConnectionReaderTest() {
    }

    @BeforeEach
    void before() {
        this.availableAndPriorityConsumer = new CompletableFuture<>();
        this.prioritySequenceNumberConsumer = new CompletableFuture<>();
        this.requiredSegmentIdFuture = new CompletableFuture<>();
    }

    @Test
    void testReadBufferOfNonPriorityDataType() {
        Optional readBuffer = createNettyConnectionReader(createInputChannelSupplier(1, false, this.requiredSegmentIdFuture), createAvailableAndPriorityConsumer(this.availableAndPriorityConsumer), createPrioritySequenceNumberConsumer(this.prioritySequenceNumberConsumer)).readBuffer(0);
        Assertions.assertThat(readBuffer).isPresent();
        Assertions.assertThat(((Buffer) readBuffer.get()).isBuffer()).isTrue();
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
        Assertions.assertThat(this.availableAndPriorityConsumer).isNotDone();
        Assertions.assertThat(this.prioritySequenceNumberConsumer).isNotDone();
    }

    @Test
    void testReadBufferOfPriorityDataType() throws ExecutionException, InterruptedException {
        Optional readBuffer = createNettyConnectionReader(createInputChannelSupplier(2, true, this.requiredSegmentIdFuture), createAvailableAndPriorityConsumer(this.availableAndPriorityConsumer), createPrioritySequenceNumberConsumer(this.prioritySequenceNumberConsumer)).readBuffer(0);
        Assertions.assertThat(readBuffer).isPresent();
        Assertions.assertThat(((Buffer) readBuffer.get()).isBuffer()).isFalse();
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
        Tuple2<Integer, Boolean> tuple2 = this.availableAndPriorityConsumer.get();
        Assertions.assertThat((Integer) tuple2.f0).isEqualTo(0);
        Assertions.assertThat((Boolean) tuple2.f1).isEqualTo(true);
        Tuple2<Integer, Integer> tuple22 = this.prioritySequenceNumberConsumer.get();
        Assertions.assertThat((Integer) tuple22.f0).isEqualTo(0);
        Assertions.assertThat((Integer) tuple22.f1).isEqualTo(0);
    }

    @Test
    void testReadEmptyBuffer() {
        Assertions.assertThat(createNettyConnectionReader(createInputChannelSupplier(0, false, this.requiredSegmentIdFuture), (num, bool) -> {
            this.availableAndPriorityConsumer.complete(Tuple2.of(num, bool));
        }, (num2, num3) -> {
            this.prioritySequenceNumberConsumer.complete(Tuple2.of(num2, num3));
        }).readBuffer(0)).isNotPresent();
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
        Assertions.assertThat(this.availableAndPriorityConsumer).isNotDone();
        Assertions.assertThat(this.prioritySequenceNumberConsumer).isNotDone();
    }

    @Test
    void testReadDifferentSegments() throws ExecutionException, InterruptedException {
        NettyConnectionReader createNettyConnectionReader = createNettyConnectionReader(createInputChannelSupplier(0, false, this.requiredSegmentIdFuture), createAvailableAndPriorityConsumer(this.availableAndPriorityConsumer), createPrioritySequenceNumberConsumer(this.prioritySequenceNumberConsumer));
        createNettyConnectionReader.readBuffer(0);
        Assertions.assertThat(this.requiredSegmentIdFuture).isNotDone();
        createNettyConnectionReader.readBuffer(1);
        Assertions.assertThat(this.requiredSegmentIdFuture.get()).isEqualTo(1);
    }

    private static BiConsumer<Integer, Boolean> createAvailableAndPriorityConsumer(CompletableFuture<Tuple2<Integer, Boolean>> completableFuture) {
        return (num, bool) -> {
            completableFuture.complete(Tuple2.of(num, bool));
        };
    }

    private static BiConsumer<Integer, Integer> createPrioritySequenceNumberConsumer(CompletableFuture<Tuple2<Integer, Integer>> completableFuture) {
        return (num, num2) -> {
            completableFuture.complete(Tuple2.of(num, num2));
        };
    }

    private static Supplier<InputChannel> createInputChannelSupplier(int i, boolean z, CompletableFuture<Integer> completableFuture) {
        TestInputChannel testInputChannel = new TestInputChannel(new SingleInputGateBuilder().build(), 0, completableFuture);
        int i2 = 0;
        while (i2 < i) {
            try {
                testInputChannel.read(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(0), FreeingBufferRecycler.INSTANCE, z ? Buffer.DataType.PRIORITIZED_EVENT_BUFFER : Buffer.DataType.DATA_BUFFER), i2 == i - 1 ? Buffer.DataType.NONE : z ? Buffer.DataType.PRIORITIZED_EVENT_BUFFER : Buffer.DataType.DATA_BUFFER);
                i2++;
            } catch (IOException | InterruptedException e) {
                ExceptionUtils.rethrow(e, "Failed to create test input channel.");
            }
        }
        return () -> {
            return testInputChannel;
        };
    }

    private static NettyConnectionReader createNettyConnectionReader(Supplier<InputChannel> supplier, BiConsumer<Integer, Boolean> biConsumer, BiConsumer<Integer, Integer> biConsumer2) {
        return new NettyConnectionReaderImpl(0, supplier, new TestingNettyConnectionReaderAvailabilityAndPriorityHelper.Builder().setAvailableAndPriorityConsumer(biConsumer).setPrioritySequenceNumberConsumer(biConsumer2).build());
    }
}
