package org.apache.flink.runtime.scheduler;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartitionTest;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest.class */
public class SsgNetworkMemoryCalculationUtilsTest {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final TestShuffleMaster SHUFFLE_MASTER = new TestShuffleMaster();
    private static final ResourceProfile DEFAULT_RESOURCE = ResourceProfile.fromResources(1.0d, 100);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SsgNetworkMemoryCalculationUtilsTest$TestShuffleMaster.class */
    public static class TestShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        private TestShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }

        public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
            return new MemorySize(computeRequiredShuffleMemoryBytes(taskInputsOutputsDescriptor.getInputChannelNums().values().stream().mapToInt((v0) -> {
                return v0.intValue();
            }).sum(), taskInputsOutputsDescriptor.getSubpartitionNums().values().stream().mapToInt((v0) -> {
                return v0.intValue();
            }).sum()));
        }

        static int computeRequiredShuffleMemoryBytes(int i, int i2) {
            return (i * 10000) + i2;
        }
    }

    @Test
    public void testGenerateEnrichedResourceProfile() throws Exception {
        testGenerateEnrichedResourceProfile(ResultPartitionType.PIPELINED, new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2) + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 12)), new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
        testGenerateEnrichedResourceProfile(ResultPartitionType.BLOCKING, new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2) + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6)), new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(10, 0)));
    }

    private void testGenerateEnrichedResourceProfile(ResultPartitionType resultPartitionType, MemorySize memorySize, MemorySize memorySize2) throws Exception {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.setResourceProfile(DEFAULT_RESOURCE);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        slotSharingGroup2.setResourceProfile(DEFAULT_RESOURCE);
        createExecutionGraphAndEnrichNetworkMemory(Arrays.asList(slotSharingGroup, slotSharingGroup, slotSharingGroup2), resultPartitionType);
        Assert.assertEquals(memorySize, slotSharingGroup.getResourceProfile().getNetworkMemory());
        Assert.assertEquals(memorySize2, slotSharingGroup2.getResourceProfile().getNetworkMemory());
    }

    @Test
    public void testGenerateUnknownResourceProfile() throws Exception {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.setResourceProfile(ResourceProfile.UNKNOWN);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        slotSharingGroup2.setResourceProfile(ResourceProfile.UNKNOWN);
        createExecutionGraphAndEnrichNetworkMemory(Arrays.asList(slotSharingGroup, slotSharingGroup, slotSharingGroup2), ResultPartitionType.PIPELINED);
        Assert.assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup.getResourceProfile());
        Assert.assertEquals(ResourceProfile.UNKNOWN, slotSharingGroup2.getResourceProfile());
    }

    @Test
    public void testGenerateEnrichedResourceProfileForDynamicGraph() throws Exception {
        List<SlotSharingGroup> asList = Arrays.asList(new SlotSharingGroup(), new SlotSharingGroup(), new SlotSharingGroup());
        Iterator<SlotSharingGroup> it = asList.iterator();
        while (it.hasNext()) {
            it.next().setResourceProfile(DEFAULT_RESOURCE);
        }
        DefaultExecutionGraph createDynamicExecutionGraph = createDynamicExecutionGraph(asList, 20);
        Iterator it2 = createDynamicExecutionGraph.getVerticesTopologically().iterator();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) it2.next();
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) it2.next();
        ExecutionJobVertex executionJobVertex3 = (ExecutionJobVertex) it2.next();
        createDynamicExecutionGraph.initializeJobVertex(executionJobVertex, 0L);
        triggerComputeNumOfSubpartitions(executionJobVertex.getProducedDataSets()[0]);
        executionJobVertex2.setParallelism(5);
        createDynamicExecutionGraph.initializeJobVertex(executionJobVertex2, 0L);
        triggerComputeNumOfSubpartitions(executionJobVertex2.getProducedDataSets()[0]);
        executionJobVertex3.setParallelism(7);
        createDynamicExecutionGraph.initializeJobVertex(executionJobVertex3, 0L);
        assertNetworkMemory(asList, Arrays.asList(new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 5)), new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 20)), new MemorySize(TestShuffleMaster.computeRequiredShuffleMemoryBytes(30, 0))));
    }

    private void triggerComputeNumOfSubpartitions(IntermediateResult intermediateResult) {
        for (IntermediateResultPartition intermediateResultPartition : intermediateResult.getPartitions()) {
            intermediateResultPartition.getNumberOfSubpartitions();
        }
    }

    private void assertNetworkMemory(List<SlotSharingGroup> list, List<MemorySize> list2) {
        Assert.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            MatcherAssert.assertThat(list.get(i).getResourceProfile().getNetworkMemory(), CoreMatchers.is(list2.get(i)));
        }
    }

    @Test
    public void testGetMaxInputChannelNumForResultForAllToAll() throws Exception {
        testGetMaxInputChannelNumForResult(DistributionPattern.ALL_TO_ALL, 5, 20, 7, 15);
    }

    @Test
    public void testGetMaxInputChannelNumForResultForPointWise() throws Exception {
        testGetMaxInputChannelNumForResult(DistributionPattern.POINTWISE, 5, 20, 3, 8);
        testGetMaxInputChannelNumForResult(DistributionPattern.POINTWISE, 5, 20, 5, 4);
        testGetMaxInputChannelNumForResult(DistributionPattern.POINTWISE, 5, 20, 7, 4);
    }

    private void testGetMaxInputChannelNumForResult(DistributionPattern distributionPattern, int i, int i2, int i3, int i4) throws Exception {
        DefaultExecutionGraph createExecutionGraph = IntermediateResultPartitionTest.createExecutionGraph(i, -1, i2, distributionPattern, true, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Iterator it = createExecutionGraph.getVerticesTopologically().iterator();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) it.next();
        ExecutionJobVertex executionJobVertex2 = (ExecutionJobVertex) it.next();
        createExecutionGraph.initializeJobVertex(executionJobVertex, 0L);
        IntermediateResult intermediateResult = executionJobVertex.getProducedDataSets()[0];
        triggerComputeNumOfSubpartitions(intermediateResult);
        executionJobVertex2.setParallelism(i3);
        createExecutionGraph.initializeJobVertex(executionJobVertex2, 0L);
        Map maxInputChannelNumsForDynamicGraph = SsgNetworkMemoryCalculationUtils.getMaxInputChannelNumsForDynamicGraph(executionJobVertex2);
        MatcherAssert.assertThat(Integer.valueOf(maxInputChannelNumsForDynamicGraph.size()), CoreMatchers.is(1));
        MatcherAssert.assertThat(maxInputChannelNumsForDynamicGraph.get(intermediateResult.getId()), CoreMatchers.is(Integer.valueOf(i4)));
    }

    private DefaultExecutionGraph createDynamicExecutionGraph(List<SlotSharingGroup> list, int i) throws Exception {
        JobGraph createJobGraph = createJobGraph(list, Arrays.asList(4, -1, -1), ResultPartitionType.BLOCKING);
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(createJobGraph).setVertexParallelismStore(AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(createJobGraph.getVertices(), i)).setShuffleMaster(SHUFFLE_MASTER).buildDynamicGraph((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private void createExecutionGraphAndEnrichNetworkMemory(List<SlotSharingGroup> list, ResultPartitionType resultPartitionType) throws Exception {
        TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(createJobGraph(list, Arrays.asList(4, 5, 6), resultPartitionType)).setShuffleMaster(SHUFFLE_MASTER).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private static JobGraph createJobGraph(List<SlotSharingGroup> list, List<Integer> list2, ResultPartitionType resultPartitionType) {
        MatcherAssert.assertThat(Integer.valueOf(list.size()), CoreMatchers.is(3));
        MatcherAssert.assertThat(Integer.valueOf(list2.size()), CoreMatchers.is(3));
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        trySetParallelism(jobVertex, list2.get(0).intValue());
        jobVertex.setSlotSharingGroup(list.get(0));
        JobVertex jobVertex2 = new JobVertex("map");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        trySetParallelism(jobVertex2, list2.get(1).intValue());
        jobVertex2.setSlotSharingGroup(list.get(1));
        JobVertex jobVertex3 = new JobVertex("sink");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        trySetParallelism(jobVertex3, list2.get(2).intValue());
        jobVertex3.setSlotSharingGroup(list.get(2));
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, resultPartitionType);
        if (resultPartitionType == ResultPartitionType.BLOCKING) {
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, resultPartitionType, intermediateDataSetID, false);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, resultPartitionType, intermediateDataSetID, false);
        } else {
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, resultPartitionType);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        }
        return !resultPartitionType.isBlockingOrBlockingPersistentResultPartition() ? JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2, jobVertex3) : JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2, jobVertex3);
    }

    private static void trySetParallelism(JobVertex jobVertex, int i) {
        if (i > 0) {
            jobVertex.setParallelism(i);
        }
    }
}
