package org.apache.flink.runtime.io.network;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.class */
class NettyShuffleEnvironmentTest {
    private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
    private static FileChannelManager fileChannelManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest$TestMetricRegistry.class */
    public static class TestMetricRegistry extends NoOpMetricRegistry {
        private final Map<String, Metric> metrics;

        TestMetricRegistry(Map<String, Metric> map) {
            this.metrics = map;
        }

        public void register(Metric metric, String str, AbstractMetricGroup abstractMetricGroup) {
            this.metrics.put(abstractMetricGroup.getLogicalScope(CharacterFilter.NO_OP_FILTER) + "." + str, metric);
        }
    }

    NettyShuffleEnvironmentTest() {
    }

    @BeforeAll
    static void setUp() {
        fileChannelManager = new FileChannelManagerImpl(new String[]{tempDir}, "testing");
    }

    @AfterAll
    static void shutdown() throws Exception {
        fileChannelManager.close();
    }

    @Test
    void testRegisterTaskWithLimitedBuffers() throws Exception {
        testRegisterTaskWithLimitedBuffers(18 + (10 * ((Integer) NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue()));
    }

    @Test
    void testRegisterTaskWithInsufficientBuffers() throws Exception {
        int intValue = (10 + (10 * ((Integer) NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue()).intValue())) - 1;
        Assertions.assertThatThrownBy(() -> {
            testRegisterTaskWithLimitedBuffers(intValue);
        }).isInstanceOf(IOException.class).hasMessageContaining("Insufficient number of network buffers");
    }

    @Test
    void testSlowIODoesNotBlockRelease() throws Exception {
        final BlockerSync blockerSync = new BlockerSync();
        new NettyShuffleEnvironmentBuilder().setResultPartitionManager(new ResultPartitionManager() { // from class: org.apache.flink.runtime.io.network.NettyShuffleEnvironmentTest.1
            public void releasePartition(ResultPartitionID resultPartitionID, Throwable th) {
                blockerSync.blockNonInterruptible();
                super.releasePartition(resultPartitionID, th);
            }
        }).setIoExecutor(Executors.newFixedThreadPool(1)).build().releasePartitionsLocally(Collections.singleton(new ResultPartitionID()));
        blockerSync.awaitBlocker();
        blockerSync.releaseBlocker();
    }

    @Test
    void testRegisteringDebloatingMetrics() throws IOException {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        TaskMetricGroup createTaskMetricGroup = createTaskMetricGroup(concurrentHashMap);
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setDebloatConfig(BufferDebloatConfiguration.fromConfiguration(configuration)).build();
        build.createInputGates(build.createShuffleIOOwnerContext("test", new ExecutionAttemptID(), createTaskMetricGroup), (intermediateDataSetID, resultPartitionID, consumer) -> {
        }, Arrays.asList(new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{new NettyShuffleDescriptorBuilder().buildRemote()}), new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 1, new ShuffleDescriptor[]{new NettyShuffleDescriptorBuilder().buildRemote()})));
        for (int i = 0; i < 2; i++) {
            Assertions.assertThat((Integer) getDebloatingMetric(concurrentHashMap, i, "debloatedBufferSize").getValue()).isEqualTo(((MemorySize) TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
            Assertions.assertThat((Long) getDebloatingMetric(concurrentHashMap, i, "estimatedTimeToConsumeBuffersMs").getValue()).isZero();
        }
    }

    @Test
    void testInputChannelMetricsOnlyRegisterOnce() throws IOException {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        Throwable th = null;
        try {
            final HashMap hashMap = new HashMap();
            ShuffleIOOwnerContext createShuffleIOOwnerContext = build.createShuffleIOOwnerContext("faker owner", new ExecutionAttemptID(), new InterceptingTaskMetricGroup() { // from class: org.apache.flink.runtime.io.network.NettyShuffleEnvironmentTest.2
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup
                public void addMetric(String str, Metric metric) {
                    hashMap.compute(str, (str2, num) -> {
                        return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
                    });
                    super.addMetric(str, metric);
                }
            });
            ArrayList arrayList = new ArrayList();
            IntermediateDataSetID[] intermediateDataSetIDArr = new IntermediateDataSetID[3];
            for (int i = 0; i < 3; i++) {
                intermediateDataSetIDArr[i] = new IntermediateDataSetID();
                arrayList.add(new InputGateDeploymentDescriptor(intermediateDataSetIDArr[i], ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{NettyShuffleDescriptorBuilder.newBuilder().buildRemote()}));
            }
            build.createInputGates(createShuffleIOOwnerContext, (intermediateDataSetID, resultPartitionID, consumer) -> {
            }, arrayList);
            Assertions.assertThat(hashMap).allSatisfy((str, num) -> {
                Assertions.assertThat(num).isOne();
            });
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    private Metric getDebloatingMetric(Map<String, Metric> map, int i, String str) {
        return map.get("taskmanager.job.task.Shuffle.Netty.Input." + i + "." + str);
    }

    private void testRegisterTaskWithLimitedBuffers(int i) throws Exception {
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(i).build();
        ConnectionManager createDummyConnectionManager = InputChannelTestUtils.createDummyConnectionManager();
        int floatingNetworkBuffersPerGate = build.getConfiguration().floatingNetworkBuffersPerGate();
        int networkBuffersPerChannel = build.getConfiguration().networkBuffersPerChannel();
        int i2 = (2 * networkBuffersPerChannel) + floatingNetworkBuffersPerGate;
        int i3 = (4 * networkBuffersPerChannel) + floatingNetworkBuffersPerGate;
        ResultPartition createPartition = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED, 2);
        ResultPartition createPartition2 = PartitionTestUtils.createPartition(build, fileChannelManager, ResultPartitionType.BLOCKING, 2);
        ResultPartition createPartition3 = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED_BOUNDED, 2);
        ResultPartition createPartition4 = PartitionTestUtils.createPartition(build, ResultPartitionType.PIPELINED_BOUNDED, 4);
        ResultPartition[] resultPartitionArr = {createPartition, createPartition2, createPartition3, createPartition4};
        SingleInputGate createSingleInputGate = createSingleInputGate(build, ResultPartitionType.PIPELINED, 2);
        SingleInputGate createSingleInputGate2 = createSingleInputGate(build, ResultPartitionType.BLOCKING, 2);
        SingleInputGate createSingleInputGate3 = createSingleInputGate(build, ResultPartitionType.PIPELINED_BOUNDED, 2);
        SingleInputGate createSingleInputGate4 = createSingleInputGate(build, ResultPartitionType.PIPELINED_BOUNDED, 4);
        SingleInputGate[] singleInputGateArr = {createSingleInputGate, createSingleInputGate2, createSingleInputGate3, createSingleInputGate4};
        createSingleInputGate4.setInputChannels(new InputChannel[]{createRemoteInputChannel(createSingleInputGate4, 0, createPartition, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate4, 0, createPartition2, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate4, 0, createPartition3, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate4, 0, createPartition4, createDummyConnectionManager)});
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel(createSingleInputGate, 1, createPartition, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate, 1, createPartition4, createDummyConnectionManager)});
        createSingleInputGate2.setInputChannels(new InputChannel[]{createRemoteInputChannel(createSingleInputGate2, 1, createPartition2, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate2, 2, createPartition4, createDummyConnectionManager)});
        createSingleInputGate3.setInputChannels(new InputChannel[]{createRemoteInputChannel(createSingleInputGate3, 1, createPartition3, createDummyConnectionManager), createRemoteInputChannel(createSingleInputGate3, 3, createPartition4, createDummyConnectionManager)});
        Task.setupPartitionsAndGates(resultPartitionArr, singleInputGateArr);
        Assertions.assertThat(createPartition.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat(createPartition2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE);
        Assertions.assertThat(createPartition3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(i2);
        Assertions.assertThat(createPartition4.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(i3);
        for (ResultPartition resultPartition : resultPartitionArr) {
            Assertions.assertThat(resultPartition.getBufferPool().getNumberOfRequiredMemorySegments()).isEqualTo(resultPartition.getNumberOfSubpartitions() + 1);
            Assertions.assertThat(resultPartition.getBufferPool().getNumBuffers()).isEqualTo(resultPartition.getNumberOfSubpartitions() + 1);
        }
        Assertions.assertThat(createSingleInputGate.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
        Assertions.assertThat(createSingleInputGate2.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
        Assertions.assertThat(createSingleInputGate3.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
        Assertions.assertThat(createSingleInputGate4.getBufferPool().getNumberOfRequiredMemorySegments()).isOne();
        Assertions.assertThat(createSingleInputGate.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingNetworkBuffersPerGate);
        Assertions.assertThat(createSingleInputGate2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingNetworkBuffersPerGate);
        Assertions.assertThat(createSingleInputGate3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingNetworkBuffersPerGate);
        Assertions.assertThat(createSingleInputGate4.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingNetworkBuffersPerGate);
        ((SingleInputGate) Mockito.verify(createSingleInputGate, Mockito.times(1))).setupChannels();
        ((SingleInputGate) Mockito.verify(createSingleInputGate2, Mockito.times(1))).setupChannels();
        ((SingleInputGate) Mockito.verify(createSingleInputGate3, Mockito.times(1))).setupChannels();
        ((SingleInputGate) Mockito.verify(createSingleInputGate4, Mockito.times(1))).setupChannels();
        for (ResultPartition resultPartition2 : resultPartitionArr) {
            resultPartition2.release();
        }
        for (SingleInputGate singleInputGate : singleInputGateArr) {
            singleInputGate.close();
        }
        build.close();
    }

    private SingleInputGate createSingleInputGate(NettyShuffleEnvironment nettyShuffleEnvironment, ResultPartitionType resultPartitionType, int i) {
        return (SingleInputGate) PowerMockito.spy(new SingleInputGateBuilder().setNumberOfChannels(i).setResultPartitionType(resultPartitionType).setupBufferPoolFactory(nettyShuffleEnvironment).build());
    }

    private static RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, int i, ResultPartition resultPartition, ConnectionManager connectionManager) {
        return InputChannelBuilder.newBuilder().setChannelIndex(i).setPartitionId(resultPartition.getPartitionId()).setConnectionManager(connectionManager).buildRemoteChannel(singleInputGate);
    }

    private static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> map) {
        return TaskManagerMetricGroup.createTaskManagerMetricGroup(new TestMetricRegistry(map), "localhost", ResourceID.generate()).addJob(new JobID(), "jobName").addTask(new JobVertexID(0L, 0L), new ExecutionAttemptID(), "test", 0, 0);
    }
}
