package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.CachedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.class */
class StreamingJobGraphGeneratorTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$CoordinatedTransformOperatorFactory.class */
    private static class CoordinatedTransformOperatorFactory extends AbstractStreamOperatorFactory<Integer> implements CoordinatedOperatorFactory<Integer>, OneInputStreamOperatorFactory<Integer, Integer> {
        private CoordinatedTransformOperatorFactory() {
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String str, OperatorID operatorID) {
            return new OperatorCoordinator.Provider() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.CoordinatedTransformOperatorFactory.1
                public OperatorID getOperatorId() {
                    return null;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return null;
                }
            };
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> streamOperatorParameters) {
            return null;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$NonSerializableCoordinatorProvider.class */
    private static class NonSerializableCoordinatorProvider implements OperatorCoordinator.Provider {
        private NonSerializableCoordinatorProvider() {
        }

        public OperatorID getOperatorId() {
            throw new UnsupportedOperationException();
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
            throw new UnsupportedOperationException();
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            throw new IOException("This provider is not serializable.");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$SerializationTestOperator.class */
    private static class SerializationTestOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
        private SerializationTestOperator() {
        }

        public void processElement(StreamRecord<Integer> streamRecord) throws Exception {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$SerializationTestOperatorFactory.class */
    private static class SerializationTestOperatorFactory extends AbstractStreamOperatorFactory<Integer> implements CoordinatedOperatorFactory<Integer> {
        private final boolean isOperatorFactorySerializable;

        SerializationTestOperatorFactory(boolean z) {
            this.isOperatorFactorySerializable = z;
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String str, OperatorID operatorID) {
            return new NonSerializableCoordinatorProvider();
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            if (!this.isOperatorFactorySerializable) {
                throw new IOException("This operator factory is not serializable.");
            }
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> streamOperatorParameters) {
            return new SerializationTestOperator();
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SerializationTestOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$TestingSingleOutputStreamOperator.class */
    private static class TestingSingleOutputStreamOperator<OUT> extends SingleOutputStreamOperator<OUT> {
        public TestingSingleOutputStreamOperator(StreamExecutionEnvironment streamExecutionEnvironment, Transformation<OUT> transformation) {
            super(streamExecutionEnvironment, transformation);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$TestingStreamExecutionEnvironment.class */
    private static class TestingStreamExecutionEnvironment extends StreamExecutionEnvironment {
        Set<AbstractID> completedClusterDatasetIds;

        private TestingStreamExecutionEnvironment() {
            this.completedClusterDatasetIds = new HashSet();
        }

        public void addCompletedClusterDataset(AbstractID abstractID) {
            this.completedClusterDatasetIds.add(abstractID);
        }

        public Set<AbstractID> listCompletedClusterDatasets() {
            return new HashSet(this.completedClusterDatasetIds);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$UnusedOperatorFactory.class */
    public static final class UnusedOperatorFactory extends AbstractStreamOperatorFactory<Long> {
        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> streamOperatorParameters) {
            throw new UnsupportedOperationException();
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$YieldingTestOperatorFactory.class */
    private static class YieldingTestOperatorFactory<T> extends SimpleOperatorFactory<T> implements YieldingOperatorFactory<T>, OneInputStreamOperatorFactory<T, T> {
        private YieldingTestOperatorFactory() {
            super(new StreamMap(obj -> {
                return obj;
            }));
        }

        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2104837527:
                    if (implMethodName.equals("lambda$new$e0defa2f$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest$YieldingTestOperatorFactory") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                        return obj -> {
                            return obj;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    StreamingJobGraphGeneratorTest() {
    }

    @Test
    void testParallelismOneNotChained() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new String[]{"a", "b", "c", "d", "e", "f"}).map(new MapFunction<String, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.1
            public Tuple2<String, String> map(String str) {
                return new Tuple2<>(str, str);
            }
        }).keyBy(new int[]{0}).map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.2
            public Tuple2<String, String> map(Tuple2<String, String> tuple2) {
                return tuple2;
            }
        }).addSink(new SinkFunction<Tuple2<String, String>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.3
            public void invoke(Tuple2<String, String> tuple2) {
            }
        });
        JobGraph jobGraph = executionEnvironment.getStreamGraph().getJobGraph();
        List verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getParallelism()).isEqualTo(1);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getParallelism()).isEqualTo(1);
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        Assertions.assertThat(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat(((JobEdge) jobVertex2.getInputs().get(0)).getSource().getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testDisabledCheckpointing() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{0}).print();
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled()).withFailMessage("Checkpointing enabled", new Object[0]).isFalse();
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
        JobCheckpointingSettings checkpointingSettings = createJobGraph.getCheckpointingSettings();
        Assertions.assertThat(checkpointingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval()).isEqualTo(Long.MAX_VALUE);
        Assertions.assertThat(checkpointingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce()).isFalse();
        Assertions.assertThat(new StreamConfig(((JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(0)).getConfiguration()).getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
    }

    @Test
    void testEnabledUnalignedCheckAndDisabledCheckpointing() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{0}).print();
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled()).withFailMessage("Checkpointing enabled", new Object[0]).isFalse();
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(true);
        StreamConfig streamConfig = new StreamConfig(((JobVertex) StreamingJobGraphGenerator.createJobGraph(streamGraph).getVerticesSortedTopologicallyFromSources().get(0)).getConfiguration());
        Assertions.assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
    }

    @Test
    void testUnalignedCheckAndAtLeastOnce() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{0}).print();
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        executionEnvironment.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(true);
        StreamConfig streamConfig = new StreamConfig(((JobVertex) StreamingJobGraphGenerator.createJobGraph(streamGraph).getVerticesSortedTopologicallyFromSources().get(0)).getConfiguration());
        Assertions.assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
    }

    @Test
    void generatorForwardsSavepointRestoreSettings() {
        Assertions.assertThat(StreamingJobGraphGenerator.createJobGraph(new StreamGraph(new ExecutionConfig(), new CheckpointConfig(), SavepointRestoreSettings.forPath("hello"))).getSavepointRestoreSettings().getRestorePath()).isEqualTo("hello");
    }

    @Test
    void testChainStartEndSetting() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(new MapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.4
            public Integer map(Integer num) throws Exception {
                return num;
            }
        }).print();
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        Assertions.assertThat(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat(((JobEdge) jobVertex2.getInputs().get(0)).getSource().getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        StreamConfig streamConfig2 = new StreamConfig(jobVertex2.getConfiguration());
        StreamConfig streamConfig3 = (StreamConfig) streamConfig2.getTransitiveChainedTaskConfigs(getClass().getClassLoader()).values().iterator().next();
        Assertions.assertThat(streamConfig.isChainStart()).isTrue();
        Assertions.assertThat(streamConfig.isChainEnd()).isTrue();
        Assertions.assertThat(streamConfig2.isChainStart()).isTrue();
        Assertions.assertThat(streamConfig2.isChainEnd()).isFalse();
        Assertions.assertThat(streamConfig3.isChainStart()).isFalse();
        Assertions.assertThat(streamConfig3.isChainEnd()).isTrue();
    }

    @Test
    void testOperatorCoordinatorAddedToJobVertex() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        new TestingSingleOutputStreamOperator(executionEnvironment, new OneInputTransformation(executionEnvironment.fromSource(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestingSource").getTransformation(), "AnyName", new CoordinatedTransformOperatorFactory(), BasicTypeInfo.INT_TYPE_INFO, executionEnvironment.getParallelism())).print();
        Assertions.assertThat(StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
    }

    @Test
    void testResourcesForChainedSourceSink() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder(0.1d, 100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder(0.2d, 200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder(0.3d, 300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder(0.4d, 400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder(0.5d, 500).build();
        Method setResourcesMethodAndSetAccessible = getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method setResourcesMethodAndSetAccessible2 = getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.5
            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            }

            public void cancel() {
            }
        });
        setResourcesMethodAndSetAccessible.invoke(addSource, build);
        SingleOutputStreamOperator map = addSource.map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.6
            public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) throws Exception {
                return tuple2;
            }
        });
        setResourcesMethodAndSetAccessible.invoke(map, build2);
        SingleOutputStreamOperator filter = map.filter(new FilterFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.7
            public boolean filter(Tuple2<Integer, Integer> tuple2) throws Exception {
                return false;
            }
        });
        setResourcesMethodAndSetAccessible.invoke(filter, build3);
        SingleOutputStreamOperator reduce = filter.keyBy(new int[]{0}).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.8
            public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) throws Exception {
                return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
            }
        });
        setResourcesMethodAndSetAccessible.invoke(reduce, build4);
        setResourcesMethodAndSetAccessible2.invoke(reduce.addSink(new SinkFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.9
            public void invoke(Tuple2<Integer, Integer> tuple2) throws Exception {
            }
        }), build5);
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        JobVertex jobVertex = (JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) createJobGraph.getVerticesSortedTopologicallyFromSources().get(1);
        Assertions.assertThat(jobVertex.getMinResources()).isEqualTo(build3.merge(build2).merge(build));
        Assertions.assertThat(jobVertex2.getPreferredResources()).isEqualTo(build4.merge(build5));
    }

    @Test
    void testResourcesForIteration() throws Exception {
        ResourceSpec build = ResourceSpec.newBuilder(0.1d, 100).build();
        ResourceSpec build2 = ResourceSpec.newBuilder(0.2d, 200).build();
        ResourceSpec build3 = ResourceSpec.newBuilder(0.3d, 300).build();
        ResourceSpec build4 = ResourceSpec.newBuilder(0.4d, 400).build();
        ResourceSpec build5 = ResourceSpec.newBuilder(0.5d, 500).build();
        Method setResourcesMethodAndSetAccessible = getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        Method setResourcesMethodAndSetAccessible2 = getSetResourcesMethodAndSetAccessible(DataStreamSink.class);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator name = executionEnvironment.addSource(new ParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.10
            public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            }

            public void cancel() {
            }
        }).name("test_source");
        setResourcesMethodAndSetAccessible.invoke(name, build);
        IterativeStream iterate = name.iterate(3000L);
        setResourcesMethodAndSetAccessible.invoke(iterate, build2);
        SingleOutputStreamOperator name2 = iterate.flatMap(new FlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.11
            public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                collector.collect(num);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Integer>) collector);
            }
        }).name("test_flatMap");
        setResourcesMethodAndSetAccessible.invoke(name2, build3);
        SingleOutputStreamOperator name3 = name2.filter(new FilterFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.12
            public boolean filter(Integer num) throws Exception {
                return false;
            }
        }).name("test_filter");
        setResourcesMethodAndSetAccessible.invoke(name3, build4);
        setResourcesMethodAndSetAccessible2.invoke(iterate.closeWith(name3).addSink(new SinkFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.13
            public void invoke(Integer num) throws Exception {
            }
        }).disableChaining().name("test_sink"), build5);
        for (JobVertex jobVertex : StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVertices()) {
            if (jobVertex.getName().contains("test_source")) {
                Assertions.assertThat(jobVertex.getMinResources()).isEqualTo(build);
            } else if (jobVertex.getName().contains("Iteration_Source")) {
                Assertions.assertThat(jobVertex.getPreferredResources()).isEqualTo(build2);
            } else if (jobVertex.getName().contains("test_flatMap")) {
                Assertions.assertThat(jobVertex.getMinResources()).isEqualTo(build3.merge(build4));
            } else if (jobVertex.getName().contains("Iteration_Tail")) {
                Assertions.assertThat(jobVertex.getPreferredResources()).isEqualTo(ResourceSpec.DEFAULT);
            } else if (jobVertex.getName().contains("test_sink")) {
                Assertions.assertThat(jobVertex.getMinResources()).isEqualTo(build5);
            }
        }
    }

    @Test
    void testInputOutputFormat() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator name = executionEnvironment.addSource(new InputFormatSourceFunction(new TypeSerializerInputFormat(TypeInformation.of(Long.class)), TypeInformation.of(Long.class)), TypeInformation.of(Long.class)).name("source");
        name.writeUsingOutputFormat(new DiscardingOutputFormat()).name("sink1");
        name.writeUsingOutputFormat(new DiscardingOutputFormat()).name("sink2");
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        Assertions.assertThat(createJobGraph.getNumberOfVertices()).isEqualTo(1);
        JobVertex jobVertex = (JobVertex) createJobGraph.getVertices().iterator().next();
        Assertions.assertThat(jobVertex).isInstanceOf(InputOutputFormatVertex.class);
        InputOutputFormatContainer inputOutputFormatContainer = new InputOutputFormatContainer(new TaskConfig(jobVertex.getConfiguration()), Thread.currentThread().getContextClassLoader());
        Map inputFormats = inputOutputFormatContainer.getInputFormats();
        Map outputFormats = inputOutputFormatContainer.getOutputFormats();
        Assertions.assertThat(inputFormats).hasSize(1);
        Assertions.assertThat(outputFormats).hasSize(2);
        HashMap hashMap = new HashMap();
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        hashMap.put(streamConfig.getOperatorName(), streamConfig.getOperatorID());
        for (StreamConfig streamConfig2 : streamConfig.getTransitiveChainedTaskConfigs(Thread.currentThread().getContextClassLoader()).values()) {
            hashMap.put(streamConfig2.getOperatorName(), streamConfig2.getOperatorID());
        }
        Assertions.assertThat((InputFormat) ((UserCodeWrapper) inputFormats.get(hashMap.get("Source: source"))).getUserCodeObject()).isInstanceOf(TypeSerializerInputFormat.class);
        Assertions.assertThat((OutputFormat) ((UserCodeWrapper) outputFormats.get(hashMap.get("Sink: sink1"))).getUserCodeObject()).isInstanceOf(DiscardingOutputFormat.class);
        Assertions.assertThat((OutputFormat) ((UserCodeWrapper) outputFormats.get(hashMap.get("Sink: sink2"))).getUserCodeObject()).isInstanceOf(DiscardingOutputFormat.class);
    }

    @Test
    void testCoordinatedOperator() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromSource(new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks(), "TestSource").addSink(new DiscardingSink());
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        Assertions.assertThat(createJobGraph.getNumberOfVertices()).isEqualTo(1);
        JobVertex jobVertex = createJobGraph.getVerticesAsArray()[0];
        Assertions.assertThat(jobVertex.getOperatorCoordinators()).hasSize(1);
        ClassLoader classLoader = getClass().getClassLoader();
        Assertions.assertThat(jobVertex.getInvokableClass(classLoader)).isEqualTo(SourceOperatorStreamTask.class);
        Assertions.assertThat(new StreamConfig(jobVertex.getConfiguration()).getStreamOperatorFactory(classLoader)).isInstanceOf(SourceOperatorFactory.class);
    }

    @Test
    void testExchangeModePipelined() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        new DataStream(executionEnvironment, new PartitionTransformation(new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new ForwardPartitioner(), StreamExchangeMode.PIPELINED)).map(num -> {
            return num;
        }).setParallelism(1).getTransformation(), new RescalePartitioner(), StreamExchangeMode.PIPELINED)).print().setParallelism(2);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(2);
        Assertions.assertThat(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testExchangeModeBatch() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setBufferTimeout(-1L);
        new DataStream(executionEnvironment, new PartitionTransformation(new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new ForwardPartitioner(), StreamExchangeMode.BATCH)).map(num -> {
            return num;
        }).setParallelism(1).getTransformation(), new RescalePartitioner(), StreamExchangeMode.BATCH)).print().setParallelism(2);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(3);
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        JobVertex jobVertex2 = (JobVertex) verticesSortedTopologicallyFromSources.get(1);
        Assertions.assertThat(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.BLOCKING);
        Assertions.assertThat(((IntermediateDataSet) jobVertex2.getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.BLOCKING);
    }

    @Test
    void testExchangeModeUndefined() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        new DataStream(executionEnvironment, new PartitionTransformation(new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new ForwardPartitioner(), StreamExchangeMode.UNDEFINED)).map(num -> {
            return num;
        }).setParallelism(1).getTransformation(), new RescalePartitioner(), StreamExchangeMode.UNDEFINED)).print().setParallelism(2);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(2);
        Assertions.assertThat(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testExchangeModeHybridFull() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        new DataStream(executionEnvironment, new PartitionTransformation(new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new ForwardPartitioner(), StreamExchangeMode.HYBRID_FULL)).map(num -> {
            return num;
        }).setParallelism(1).getTransformation(), new RescalePartitioner(), StreamExchangeMode.HYBRID_FULL)).print().setParallelism(2);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(2);
        Assertions.assertThat(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.HYBRID_FULL);
    }

    @Test
    void testExchangeModeHybridSelective() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        new DataStream(executionEnvironment, new PartitionTransformation(new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new ForwardPartitioner(), StreamExchangeMode.HYBRID_SELECTIVE)).map(num -> {
            return num;
        }).setParallelism(1).getTransformation(), new RescalePartitioner(), StreamExchangeMode.HYBRID_SELECTIVE)).print().setParallelism(2);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(2);
        Assertions.assertThat(((IntermediateDataSet) ((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getProducedDataSets().get(0)).getResultType()).isEqualTo(ResultPartitionType.HYBRID_SELECTIVE);
    }

    @Test
    void testStreamingJobTypeByDefault() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new String[]{"test"}).addSink(new DiscardingSink());
        Assertions.assertThat(StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getJobType()).isEqualTo(JobType.STREAMING);
    }

    @Test
    void testBatchJobType() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.fromElements(new String[]{"test"}).addSink(new DiscardingSink());
        Assertions.assertThat(StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getJobType()).isEqualTo(JobType.BATCH);
    }

    @Test
    void testPartitionTypesInBatchMode() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(4);
        executionEnvironment.disableOperatorChaining();
        executionEnvironment.fromElements(new Integer[]{1}).map(num -> {
            return num;
        }).setParallelism(1).rescale().map(num2 -> {
            return num2;
        }).rebalance().map(num3 -> {
            return num3;
        }).keyBy(num4 -> {
            return num4;
        }).map(num5 -> {
            return num5;
        }).addSink(new DiscardingSink());
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVerticesSortedTopologicallyFromSources();
        assertHasOutputPartitionType((JobVertex) verticesSortedTopologicallyFromSources.get(0), ResultPartitionType.BLOCKING);
        assertHasOutputPartitionType((JobVertex) verticesSortedTopologicallyFromSources.get(1), ResultPartitionType.BLOCKING);
        assertHasOutputPartitionType((JobVertex) verticesSortedTopologicallyFromSources.get(2), ResultPartitionType.BLOCKING);
        assertHasOutputPartitionType((JobVertex) verticesSortedTopologicallyFromSources.get(3), ResultPartitionType.BLOCKING);
        assertHasOutputPartitionType((JobVertex) verticesSortedTopologicallyFromSources.get(4), ResultPartitionType.BLOCKING);
    }

    private void assertHasOutputPartitionType(JobVertex jobVertex, ResultPartitionType resultPartitionType) {
        Assertions.assertThat(((IntermediateDataSet) jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo(resultPartitionType);
    }

    @Test
    void testNormalExchangeModeWithBufferTimeout() {
        testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
    }

    private void testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode streamExchangeMode) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setBufferTimeout(100L);
        new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), new RebalancePartitioner(), streamExchangeMode)).map(num -> {
            return num;
        }).print();
        StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
    }

    @Test
    void testDisablingBufferTimeoutWithPipelinedExchanges() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.setBufferTimeout(-1L);
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(num -> {
            return num;
        }).print();
        Iterator it = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()).getVertices().iterator();
        while (it.hasNext()) {
            Iterator it2 = new StreamConfig(((JobVertex) it.next()).getConfiguration()).getVertexNonChainedOutputs(getClass().getClassLoader()).iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((NonChainedOutput) it2.next()).getBufferTimeout()).isEqualTo(-1L);
            }
        }
    }

    @Test
    void testIteration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        IterativeStream iterate = executionEnvironment.fromElements(new Integer[]{1, 2, 3}).name("source").iterate(3000L);
        iterate.name("iteration").setParallelism(2);
        iterate.closeWith(iterate.map(num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).name("map").setParallelism(2).filter(num2 -> {
            return false;
        }).name("filter").setParallelism(2)).print();
        JobGraph createJobGraph = StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        SlotSharingGroup slotSharingGroup = createJobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
        Assertions.assertThat(slotSharingGroup).isNotNull();
        CoLocationGroup coLocationGroup = null;
        CoLocationGroup coLocationGroup2 = null;
        for (JobVertex jobVertex : createJobGraph.getVertices()) {
            Assertions.assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
            if (jobVertex.getName().startsWith("IterationSource")) {
                coLocationGroup = jobVertex.getCoLocationGroup();
                Assertions.assertThat(coLocationGroup.getVertexIds()).contains(new JobVertexID[]{jobVertex.getID()});
            } else if (jobVertex.getName().startsWith("IterationSink")) {
                coLocationGroup2 = jobVertex.getCoLocationGroup();
                Assertions.assertThat(coLocationGroup2.getVertexIds()).contains(new JobVertexID[]{jobVertex.getID()});
            } else {
                Assertions.assertThat(jobVertex.getCoLocationGroup()).isNull();
            }
        }
        Assertions.assertThat(coLocationGroup).isNotNull();
        Assertions.assertThat(coLocationGroup2).isNotNull();
        Assertions.assertThat(coLocationGroup2).isEqualTo(coLocationGroup);
    }

    @Test
    void testDefaultJobType() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Assertions.assertThat(StreamingJobGraphGenerator.createJobGraph(new StreamGraphGenerator(Collections.emptyList(), executionEnvironment.getConfig(), executionEnvironment.getCheckpointConfig()).generate()).getJobType()).isEqualTo(JobType.STREAMING);
    }

    @Test
    void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.fromElements(new Integer[]{1}).map(num -> {
            return num;
        }).transform("test", BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = createLocalEnvironment.getStreamGraph();
        List list = (List) streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getId();
        })).collect(Collectors.toList());
        Assertions.assertThat(StreamingJobGraphGenerator.areOperatorsChainable((StreamNode) list.get(0), (StreamNode) list.get(1), streamGraph)).isTrue();
        Assertions.assertThat(StreamingJobGraphGenerator.areOperatorsChainable((StreamNode) list.get(1), (StreamNode) list.get(2), streamGraph)).isFalse();
    }

    @Test
    void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.fromElements(new Integer[]{1}).disableChaining().map(num -> {
            return num;
        }).transform("test", BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory());
        StreamGraph streamGraph = createLocalEnvironment.getStreamGraph();
        List list = (List) streamGraph.getStreamNodes().stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getId();
        })).collect(Collectors.toList());
        Assertions.assertThat(StreamingJobGraphGenerator.areOperatorsChainable((StreamNode) list.get(0), (StreamNode) list.get(1), streamGraph)).isFalse();
        Assertions.assertThat(StreamingJobGraphGenerator.areOperatorsChainable((StreamNode) list.get(1), (StreamNode) list.get(2), streamGraph)).isTrue();
    }

    @Test
    void testYieldingOperatorProperlyChainedOnLegacySources() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.fromElements(new Integer[]{1}).map(num -> {
            return num;
        }).transform("test", BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map(num2 -> {
            return num2;
        }).transform("test", BasicTypeInfo.INT_TYPE_INFO, new YieldingTestOperatorFactory()).map(num3 -> {
            return num3;
        }).addSink(new DiscardingSink());
        List verticesSortedTopologicallyFromSources = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(2);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getOperatorIDs()).hasSize(2);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getOperatorIDs()).hasSize(5);
    }

    @Test
    void testYieldingOperatorProperlyChainedOnNewSources() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        createLocalEnvironment.fromSource(new NumberSequenceSource(0L, 10L), WatermarkStrategy.noWatermarks(), "input").map(l -> {
            return l;
        }).transform("test", BasicTypeInfo.LONG_TYPE_INFO, new YieldingTestOperatorFactory()).addSink(new DiscardingSink());
        List verticesSortedTopologicallyFromSources = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(1);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getOperatorIDs()).hasSize(4);
    }

    @Test
    void testDeterministicUnionOrder() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1);
        JobVertex jobVertex = (JobVertex) Iterables.getLast(getUnionJobGraph(createLocalEnvironment).getVerticesSortedTopologicallyFromSources());
        List list = (List) jobVertex.getInputs().stream().map(jobEdge -> {
            return jobEdge.getSource().getProducer().getName();
        }).collect(Collectors.toList());
        for (int i = 0; i < 100; i++) {
            JobVertex jobVertex2 = (JobVertex) Iterables.getLast(getUnionJobGraph(createLocalEnvironment).getVerticesSortedTopologicallyFromSources());
            Assertions.assertThat(jobVertex).withFailMessage("Different runs should yield different vertexes", new Object[0]).isNotEqualTo(jobVertex2);
            Assertions.assertThat((List) jobVertex2.getInputs().stream().map(jobEdge2 -> {
                return jobEdge2.getSource().getProducer().getName();
            }).collect(Collectors.toList())).withFailMessage("Union inputs reordered", new Object[0]).isEqualTo(list);
        }
    }

    private JobGraph getUnionJobGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        createSource(streamExecutionEnvironment, 1).union(new DataStream[]{createSource(streamExecutionEnvironment, 2)}).union(new DataStream[]{createSource(streamExecutionEnvironment, 3)}).union(new DataStream[]{createSource(streamExecutionEnvironment, 4)}).addSink(new DiscardingSink());
        return StreamingJobGraphGenerator.createJobGraph(streamExecutionEnvironment.getStreamGraph());
    }

    private DataStream<Integer> createSource(StreamExecutionEnvironment streamExecutionEnvironment, int i) {
        return streamExecutionEnvironment.fromElements(new Integer[]{Integer.valueOf(i)}).name("source" + i).map(num -> {
            return num;
        }).name("map" + i);
    }

    @Test
    void testNotSupportInputSelectableOperatorIfCheckpointing() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(60000L);
        executionEnvironment.fromElements(new String[]{"1"}).connect(executionEnvironment.fromElements(new Integer[]{1})).transform("test", BasicTypeInfo.STRING_TYPE_INFO, new TestAnyModeReadingStreamOperator("test operator")).print();
        Assertions.assertThatThrownBy(() -> {
            StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
        }).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
        ResourceSpec resourceSpec = ResourceSpec.UNKNOWN;
        List<ResourceSpec> asList = Arrays.asList(resourceSpec, resourceSpec, resourceSpec, resourceSpec);
        Configuration configuration = new Configuration() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.14
            {
                set(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS, new HashMap<String, String>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.14.1
                    {
                        put("DATAPROC", "6");
                        put("PYTHON", "4");
                    }
                });
            }
        };
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        arrayList2.add(Collections.emptySet());
        arrayList.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        arrayList2.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        arrayList.add(Collections.emptyMap());
        arrayList2.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
        arrayList.add(Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
        arrayList2.add(Collections.emptySet());
        JobGraph createJobGraphForManagedMemoryFractionTest = createJobGraphForManagedMemoryFractionTest(asList, arrayList, arrayList2);
        JobVertex jobVertex = (JobVertex) createJobGraphForManagedMemoryFractionTest.getVerticesSortedTopologicallyFromSources().get(0);
        JobVertex jobVertex2 = (JobVertex) createJobGraphForManagedMemoryFractionTest.getVerticesSortedTopologicallyFromSources().get(1);
        JobVertex jobVertex3 = (JobVertex) createJobGraphForManagedMemoryFractionTest.getVerticesSortedTopologicallyFromSources().get(2);
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        verifyFractions(streamConfig, 0.3d, 0.0d, 0.0d, configuration);
        verifyFractions((StreamConfig) Iterables.getOnlyElement(streamConfig.getTransitiveChainedTaskConfigs(StreamingJobGraphGeneratorTest.class.getClassLoader()).values()), 0.3d, 0.4d, 0.0d, configuration);
        verifyFractions(new StreamConfig(jobVertex2.getConfiguration()), 0.0d, 0.4d, 0.0d, configuration);
        verifyFractions(new StreamConfig(jobVertex3.getConfiguration()), 1.0d, 0.0d, 0.0d, configuration);
    }

    private JobGraph createJobGraphForManagedMemoryFractionTest(List<ResourceSpec> list, List<Map<ManagedMemoryUseCase, Integer>> list2, List<Set<ManagedMemoryUseCase>> list3) throws Exception {
        Method setResourcesMethodAndSetAccessible = getSetResourcesMethodAndSetAccessible(SingleOutputStreamOperator.class);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new ParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.api.graph.StreamingJobGraphGeneratorTest.15
            public void run(SourceFunction.SourceContext<Integer> sourceContext) {
            }

            public void cancel() {
            }
        });
        setResourcesMethodAndSetAccessible.invoke(addSource, list.get(0));
        SingleOutputStreamOperator map = addSource.map(num -> {
            return num;
        });
        setResourcesMethodAndSetAccessible.invoke(map, list.get(1));
        SingleOutputStreamOperator map2 = map.rebalance().map(num2 -> {
            return num2;
        });
        setResourcesMethodAndSetAccessible.invoke(map2, list.get(2));
        SingleOutputStreamOperator slotSharingGroup = map2.rebalance().map(num3 -> {
            return num3;
        }).slotSharingGroup("test");
        setResourcesMethodAndSetAccessible.invoke(slotSharingGroup, list.get(3));
        declareManagedMemoryUseCaseForTranformation(addSource.getTransformation(), list2.get(0), list3.get(0));
        declareManagedMemoryUseCaseForTranformation(map.getTransformation(), list2.get(1), list3.get(1));
        declareManagedMemoryUseCaseForTranformation(map2.getTransformation(), list2.get(2), list3.get(2));
        declareManagedMemoryUseCaseForTranformation(slotSharingGroup.getTransformation(), list2.get(3), list3.get(3));
        return StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
    }

    private void declareManagedMemoryUseCaseForTranformation(Transformation<?> transformation, Map<ManagedMemoryUseCase, Integer> map, Set<ManagedMemoryUseCase> set) {
        for (Map.Entry<ManagedMemoryUseCase, Integer> entry : map.entrySet()) {
            transformation.declareManagedMemoryUseCaseAtOperatorScope(entry.getKey(), entry.getValue().intValue());
        }
        Iterator<ManagedMemoryUseCase> it = set.iterator();
        while (it.hasNext()) {
            transformation.declareManagedMemoryUseCaseAtSlotScope(it.next());
        }
    }

    private void verifyFractions(StreamConfig streamConfig, double d, double d2, double d3, Configuration configuration) {
        Assertions.assertThat(streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, configuration, ClassLoader.getSystemClassLoader())).isCloseTo(d3, Offset.offset(Double.valueOf(1.0E-6d)));
        Assertions.assertThat(streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, configuration, ClassLoader.getSystemClassLoader())).isCloseTo(d2, Offset.offset(Double.valueOf(1.0E-6d)));
        Assertions.assertThat(streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, configuration, ClassLoader.getSystemClassLoader())).isCloseTo(d, Offset.offset(Double.valueOf(1.0E-6d)));
    }

    @Test
    void testSetNonDefaultSlotSharingInHybridMode() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
        StreamGraph createStreamGraphForSlotSharingTest = createStreamGraphForSlotSharingTest(configuration);
        ((StreamNode) createStreamGraphForSlotSharingTest.getStreamNodes().stream().filter(streamNode -> {
            return "map1".equals(streamNode.getOperatorName());
        }).findFirst().get()).setSlotSharingGroup("testSlotSharingGroup");
        Assertions.assertThatThrownBy(() -> {
            StreamingJobGraphGenerator.createJobGraph(createStreamGraphForSlotSharingTest);
        }).isInstanceOf(IllegalStateException.class).hasMessage("hybrid shuffle mode currently does not support setting non-default slot sharing group.");
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
        StreamGraph createStreamGraphForSlotSharingTest2 = createStreamGraphForSlotSharingTest(configuration);
        ((StreamNode) createStreamGraphForSlotSharingTest2.getStreamNodes().stream().filter(streamNode2 -> {
            return "map1".equals(streamNode2.getOperatorName());
        }).findFirst().get()).setSlotSharingGroup("testSlotSharingGroup");
        Assertions.assertThatThrownBy(() -> {
            StreamingJobGraphGenerator.createJobGraph(createStreamGraphForSlotSharingTest2);
        }).isInstanceOf(IllegalStateException.class).hasMessage("hybrid shuffle mode currently does not support setting non-default slot sharing group.");
    }

    @Test
    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
        StreamGraph createStreamGraphForSlotSharingTest = createStreamGraphForSlotSharingTest(new Configuration());
        ((StreamNode) createStreamGraphForSlotSharingTest.getStreamNodes().stream().filter(streamNode -> {
            return "map1".equals(streamNode.getOperatorName());
        }).findFirst().get()).setSlotSharingGroup("testSlotSharingGroup");
        createStreamGraphForSlotSharingTest.setAllVerticesInSameSlotSharingGroupByDefault(true);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(createStreamGraphForSlotSharingTest).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(4);
        List<JobVertex> expectedVerticesList = getExpectedVerticesList(verticesSortedTopologicallyFromSources);
        JobVertex jobVertex = expectedVerticesList.get(0);
        JobVertex jobVertex2 = expectedVerticesList.get(1);
        JobVertex jobVertex3 = expectedVerticesList.get(2);
        assertSameSlotSharingGroup(jobVertex, jobVertex2, expectedVerticesList.get(3));
        assertDistinctSharingGroups(jobVertex, jobVertex3);
    }

    @Test
    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
        StreamGraph createStreamGraphForSlotSharingTest = createStreamGraphForSlotSharingTest(new Configuration());
        createStreamGraphForSlotSharingTest.setAllVerticesInSameSlotSharingGroupByDefault(false);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(createStreamGraphForSlotSharingTest).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(4);
        List<JobVertex> expectedVerticesList = getExpectedVerticesList(verticesSortedTopologicallyFromSources);
        assertDistinctSharingGroups(expectedVerticesList.get(0), expectedVerticesList.get(1), expectedVerticesList.get(3), expectedVerticesList.get(2));
    }

    @Test
    void testSlotSharingResourceConfiguration() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 10);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 20);
        ResourceProfile fromResources3 = ResourceProfile.fromResources(3.0d, 30);
        HashMap hashMap = new HashMap();
        hashMap.put("slot-a", fromResources);
        hashMap.put("slot-b", fromResources2);
        hashMap.put("default", fromResources3);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).name("slot-a").slotSharingGroup("slot-a").map(num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).name("slot-b").slotSharingGroup("slot-b").map(num2 -> {
            return Integer.valueOf(num2.intValue() * num2.intValue());
        }).name("default").slotSharingGroup("default");
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(hashMap);
        int i = 0;
        for (JobVertex jobVertex : StreamingJobGraphGenerator.createJobGraph(streamGraph).getVertices()) {
            i++;
            if (jobVertex.getName().contains("slot-a")) {
                Assertions.assertThat(jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo(fromResources);
            } else if (jobVertex.getName().contains("slot-b")) {
                Assertions.assertThat(jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo(fromResources2);
            } else if (jobVertex.getName().contains("default")) {
                Assertions.assertThat(jobVertex.getSlotSharingGroup().getResourceProfile()).isEqualTo(fromResources3);
            } else {
                Assertions.fail("");
            }
        }
        Assertions.assertThat(i).isEqualTo(3);
    }

    @Test
    void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 10);
        HashMap hashMap = new HashMap();
        hashMap.put("default", fromResources);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).map(num -> {
            return Integer.valueOf(num.intValue() + 1);
        });
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setSlotSharingGroupResource(hashMap);
        int i = 0;
        Iterator it = StreamingJobGraphGenerator.createJobGraph(streamGraph).getVertices().iterator();
        while (it.hasNext()) {
            i++;
            Assertions.assertThat(((JobVertex) it.next()).getSlotSharingGroup().getResourceProfile()).isEqualTo(fromResources);
        }
        Assertions.assertThat(i).isEqualTo(2);
    }

    @Test
    void testNamingOfChainedMultipleInputs() {
        String[] strArr = {"source-1", "source-2", "source-3"};
        JobVertex jobVertex = (JobVertex) createGraphWithMultipleInputs(true, strArr).getVerticesSortedTopologicallyFromSources().iterator().next();
        Assertions.assertThat(strArr).allMatch(str -> {
            return jobVertex.getOperatorPrettyName().contains(str);
        });
    }

    @Test
    void testNamingOfNonChainedMultipleInputs() {
        JobVertex jobVertex = (JobVertex) Iterables.find(createGraphWithMultipleInputs(false, "source-1", "source-2", "source-3").getVertices(), jobVertex2 -> {
            return jobVertex2.getInvokableClassName().equals(MultipleInputStreamTask.class.getName());
        });
        Assertions.assertThat(jobVertex.getName()).withFailMessage(jobVertex.getName(), new Object[0]).doesNotContain(new CharSequence[]{"source-1"});
        Assertions.assertThat(jobVertex.getOperatorPrettyName()).withFailMessage(jobVertex.getOperatorPrettyName(), new Object[0]).doesNotContain(new CharSequence[]{"source-1"});
    }

    public JobGraph createGraphWithMultipleInputs(boolean z, String... strArr) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation("mit", new UnusedOperatorFactory(), Types.LONG, executionEnvironment.getParallelism());
        Stream map = Arrays.stream(strArr).map(str -> {
            return executionEnvironment.fromSource(new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), str).getTransformation();
        });
        multipleInputTransformation.getClass();
        map.forEach(multipleInputTransformation::addInput);
        multipleInputTransformation.setChainingStrategy(z ? ChainingStrategy.HEAD_WITH_SOURCES : ChainingStrategy.NEVER);
        executionEnvironment.addOperator(multipleInputTransformation);
        return StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph());
    }

    @Test
    void testTreeDescription() {
        JobVertex[] verticesAsArray = createJobGraphWithDescription(StreamExecutionEnvironment.getExecutionEnvironment(), "test source").getVerticesAsArray();
        Assertions.assertThat(verticesAsArray).hasSize(1);
        Assertions.assertThat(verticesAsArray[0].getOperatorPrettyName()).isEqualTo("test source\n:- x + 1\n:  :- first print of map1\n:  +- second print of map1\n+- x + 2\n   :- first print of map2\n   +- second print of map2\n");
    }

    @Test
    void testTreeDescriptionWithChainedSource() {
        JobVertex[] verticesAsArray = createJobGraphWithDescription(StreamExecutionEnvironment.getExecutionEnvironment(), "test source 1", "test source 2").getVerticesAsArray();
        Assertions.assertThat(verticesAsArray).hasSize(1);
        Assertions.assertThat(verticesAsArray[0].getOperatorPrettyName()).isEqualTo("operator chained with source [test source 1, test source 2]\n:- x + 1\n:  :- first print of map1\n:  +- second print of map1\n+- x + 2\n   :- first print of map2\n   +- second print of map2\n");
    }

    @Test
    void testCascadingDescription() {
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.VERTEX_DESCRIPTION_MODE, PipelineOptions.VertexDescriptionMode.CASCADING);
        JobVertex[] verticesAsArray = createJobGraphWithDescription(StreamExecutionEnvironment.getExecutionEnvironment(configuration), "test source").getVerticesAsArray();
        Assertions.assertThat(verticesAsArray).hasSize(1);
        Assertions.assertThat(verticesAsArray[0].getOperatorPrettyName()).isEqualTo("test source -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
    }

    @Test
    void testCascadingDescriptionWithChainedSource() {
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.VERTEX_DESCRIPTION_MODE, PipelineOptions.VertexDescriptionMode.CASCADING);
        JobVertex[] verticesAsArray = createJobGraphWithDescription(StreamExecutionEnvironment.getExecutionEnvironment(configuration), "test source 1", "test source 2").getVerticesAsArray();
        Assertions.assertThat(verticesAsArray).hasSize(1);
        Assertions.assertThat(verticesAsArray[0].getOperatorPrettyName()).isEqualTo("operator chained with source [test source 1, test source 2] -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
    }

    @Test
    void testNamingWithoutIndex() {
        List verticesSortedTopologicallyFromSources = createStreamGraphForSlotSharingTest(new Configuration()).getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(4);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getName()).isEqualTo("Source: source1");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getName()).isEqualTo("Source: source2");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(2)).getName()).isEqualTo("map1");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(3)).getName()).isEqualTo("map2");
    }

    @Test
    void testNamingWithIndex() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX, true);
        List verticesSortedTopologicallyFromSources = createStreamGraphForSlotSharingTest(configuration).getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(4);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getName()).isEqualTo("[vertex-0]Source: source1");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getName()).isEqualTo("[vertex-1]Source: source2");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(2)).getName()).isEqualTo("[vertex-2]map1");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(3)).getName()).isEqualTo("[vertex-3]map2");
    }

    @Test
    void testCacheJobGraph() throws Throwable {
        TestingStreamExecutionEnvironment testingStreamExecutionEnvironment = new TestingStreamExecutionEnvironment();
        testingStreamExecutionEnvironment.setParallelism(2);
        testingStreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        CachedDataStream cache = testingStreamExecutionEnvironment.fromElements(new Integer[]{1, 2, 3}).name("source").map(num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).name("map-1").map(num2 -> {
            return Integer.valueOf(num2.intValue() + 1);
        }).name("map-2").cache();
        Assertions.assertThat(cache.getTransformation()).isInstanceOf(CacheTransformation.class);
        CacheTransformation transformation = cache.getTransformation();
        cache.print().name("print");
        List verticesSortedTopologicallyFromSources = testingStreamExecutionEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(3);
        List inputs = ((JobVertex) verticesSortedTopologicallyFromSources.stream().filter(jobVertex -> {
            return "CacheWrite".equals(jobVertex.getName());
        }).findFirst().orElseThrow(() -> {
            return new RuntimeException("CacheWrite job vertex not found");
        })).getInputs();
        Assertions.assertThat(inputs).hasSize(1);
        Assertions.assertThat(((JobEdge) inputs.get(0)).getDistributionPattern()).isEqualTo(DistributionPattern.POINTWISE);
        Assertions.assertThat(((JobEdge) inputs.get(0)).getSource().getResultType()).isEqualTo(ResultPartitionType.BLOCKING_PERSISTENT);
        Assertions.assertThat(new AbstractID(((JobEdge) inputs.get(0)).getSourceId())).isEqualTo(transformation.getDatasetId());
        Assertions.assertThat(((JobEdge) inputs.get(0)).getSource().getProducer().getName()).isEqualTo("map-1 -> map-2 -> Sink: print");
        testingStreamExecutionEnvironment.addCompletedClusterDataset(transformation.getDatasetId());
        cache.print().name("print");
        List verticesSortedTopologicallyFromSources2 = testingStreamExecutionEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources2).hasSize(1);
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getName()).isEqualTo("CacheRead -> Sink: print");
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getIntermediateDataSetIdsToConsume()).hasSize(1);
        Assertions.assertThat(new AbstractID((AbstractID) ((JobVertex) verticesSortedTopologicallyFromSources2.get(0)).getIntermediateDataSetIdsToConsume().get(0))).isEqualTo(transformation.getDatasetId());
    }

    @Test
    void testIntermediateDataSetReuse() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setBufferTimeout(-1L);
        DataStreamSource fromElements = executionEnvironment.fromElements(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        fromElements.rebalance().addSink(new DiscardingSink()).setParallelism(2).name("sink1");
        fromElements.rebalance().addSink(new DiscardingSink()).setParallelism(2).name("sink2");
        fromElements.rebalance().addSink(new DiscardingSink()).setParallelism(3);
        fromElements.broadcast().addSink(new DiscardingSink()).setParallelism(2);
        fromElements.forward().addSink(new DiscardingSink()).setParallelism(1).disableChaining();
        fromElements.forward().addSink(new DiscardingSink()).setParallelism(1).disableChaining();
        SingleOutputStreamOperator parallelism = fromElements.forward().map(num -> {
            return num;
        }).setParallelism(1);
        parallelism.broadcast().addSink(new DiscardingSink()).setParallelism(2).name("sink3");
        parallelism.broadcast().addSink(new DiscardingSink()).setParallelism(2).name("sink4");
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
        List verticesSortedTopologicallyFromSources = StreamingJobGraphGenerator.createJobGraph(streamGraph).getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(verticesSortedTopologicallyFromSources).hasSize(9);
        JobVertex jobVertex = (JobVertex) verticesSortedTopologicallyFromSources.get(0);
        List list = (List) jobVertex.getProducedDataSets().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(6);
        JobVertex jobVertex2 = (JobVertex) Preconditions.checkNotNull(findJobVertexWithName(verticesSortedTopologicallyFromSources, "sink1"));
        JobVertex jobVertex3 = (JobVertex) Preconditions.checkNotNull(findJobVertexWithName(verticesSortedTopologicallyFromSources, "sink2"));
        JobVertex jobVertex4 = (JobVertex) Preconditions.checkNotNull(findJobVertexWithName(verticesSortedTopologicallyFromSources, "sink3"));
        JobVertex jobVertex5 = (JobVertex) Preconditions.checkNotNull(findJobVertexWithName(verticesSortedTopologicallyFromSources, "sink4"));
        Assertions.assertThat(((JobEdge) jobVertex3.getInputs().get(0)).getSource().getId()).isEqualTo(((JobEdge) jobVertex2.getInputs().get(0)).getSource().getId());
        Assertions.assertThat(((JobEdge) jobVertex5.getInputs().get(0)).getSource().getId()).isEqualTo(((JobEdge) jobVertex4.getInputs().get(0)).getSource().getId());
        Assertions.assertThat(((JobEdge) jobVertex4.getInputs().get(0)).getSource().getId()).isNotEqualTo(((JobEdge) jobVertex2.getInputs().get(0)).getSource().getId());
        StreamConfig streamConfig = new StreamConfig(jobVertex.getConfiguration());
        List list2 = (List) streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream().map((v0) -> {
            return v0.getDataSetId();
        }).collect(Collectors.toList());
        Assertions.assertThat(list2).hasSize(5);
        Assertions.assertThat(list2).doesNotContain(new IntermediateDataSetID[]{((JobEdge) jobVertex4.getInputs().get(0)).getSource().getId()});
        List list3 = (List) streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream().map((v0) -> {
            return v0.getDataSetId();
        }).collect(Collectors.toList());
        Assertions.assertThat(list3).hasSize(6);
        Assertions.assertThat(list3).isEqualTo(list);
    }

    @Test
    void testStreamConfigSerializationException() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addOperator(new OneInputTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), "serializationTestOperator", new SerializationTestOperatorFactory(false), Types.INT, 1));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assertions.assertThatThrownBy(() -> {
            StreamingJobGraphGenerator.createJobGraph(streamGraph);
        }).hasRootCauseInstanceOf(IOException.class).hasRootCauseMessage("This operator factory is not serializable.");
    }

    @Test
    public void testCoordinatedSerializationException() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addOperator(new OneInputTransformation(executionEnvironment.fromElements(new Integer[]{1, 2, 3}).getTransformation(), "serializationTestOperator", new SerializationTestOperatorFactory(true), Types.INT, 1));
        StreamGraph streamGraph = executionEnvironment.getStreamGraph();
        Assertions.assertThatThrownBy(() -> {
            StreamingJobGraphGenerator.createJobGraph(streamGraph);
        }).hasRootCauseInstanceOf(IOException.class).hasRootCauseMessage("This provider is not serializable.");
    }

    private static JobVertex findJobVertexWithName(List<JobVertex> list, String str) {
        for (JobVertex jobVertex : list) {
            if (jobVertex.getName().contains(str)) {
                return jobVertex;
            }
        }
        return null;
    }

    private JobGraph createJobGraphWithDescription(StreamExecutionEnvironment streamExecutionEnvironment, String... strArr) {
        SingleOutputStreamOperator dataStream;
        streamExecutionEnvironment.setParallelism(1);
        if (strArr.length == 1) {
            dataStream = streamExecutionEnvironment.fromElements(new Long[]{1L, 2L, 3L}).setDescription(strArr[0]);
        } else {
            MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation("mit", new UnusedOperatorFactory(), Types.LONG, streamExecutionEnvironment.getParallelism());
            multipleInputTransformation.setDescription("operator chained with source");
            multipleInputTransformation.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
            Stream map = Arrays.stream(strArr).map(str -> {
                return streamExecutionEnvironment.fromSource(new NumberSequenceSource(1L, 2L), WatermarkStrategy.noWatermarks(), str).setDescription(str).getTransformation();
            });
            multipleInputTransformation.getClass();
            map.forEach(multipleInputTransformation::addInput);
            dataStream = new DataStream(streamExecutionEnvironment, multipleInputTransformation);
        }
        SingleOutputStreamOperator description = dataStream.map(l -> {
            return Long.valueOf(l.longValue() + 1);
        }).setDescription("x + 1");
        SingleOutputStreamOperator description2 = dataStream.map(l2 -> {
            return Long.valueOf(l2.longValue() + 2);
        }).setDescription("x + 2");
        description.print().setDescription("first print of map1");
        description.print().setDescription("second print of map1");
        description2.print().setDescription("first print of map2");
        description2.print().setDescription("second print of map2");
        return StreamingJobGraphGenerator.createJobGraph(streamExecutionEnvironment.getStreamGraph());
    }

    private static List<JobVertex> getExpectedVerticesList(List<JobVertex> list) {
        ArrayList arrayList = new ArrayList();
        List asList = Arrays.asList("source1", "source2", "map1", "map2");
        for (int i = 0; i < asList.size(); i++) {
            for (JobVertex jobVertex : list) {
                if (jobVertex.getName().contains((CharSequence) asList.get(i))) {
                    arrayList.add(jobVertex);
                }
            }
        }
        return arrayList;
    }

    private StreamGraph createStreamGraphForSlotSharingTest(Configuration configuration) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setBufferTimeout(-1L);
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.fromElements(new Integer[]{1, 2, 3}).name("source1").rebalance().map(num -> {
            return num;
        }).name("map1");
        new DataStream(executionEnvironment, new PartitionTransformation(executionEnvironment.fromElements(new Integer[]{4, 5, 6}).name("source2").getTransformation(), new RebalancePartitioner(), StreamExchangeMode.BATCH)).map(num2 -> {
            return num2;
        }).name("map2");
        return executionEnvironment.getStreamGraph();
    }

    private void assertSameSlotSharingGroup(JobVertex... jobVertexArr) {
        for (int i = 0; i < jobVertexArr.length - 1; i++) {
            Assertions.assertThat(jobVertexArr[i + 1].getSlotSharingGroup()).isEqualTo(jobVertexArr[i].getSlotSharingGroup());
        }
    }

    private void assertDistinctSharingGroups(JobVertex... jobVertexArr) {
        for (int i = 0; i < jobVertexArr.length - 1; i++) {
            for (int i2 = i + 1; i2 < jobVertexArr.length; i2++) {
                Assertions.assertThat(jobVertexArr[i].getSlotSharingGroup()).isNotEqualTo(jobVertexArr[i2].getSlotSharingGroup());
            }
        }
    }

    private static Method getSetResourcesMethodAndSetAccessible(Class<?> cls) throws NoSuchMethodException {
        Method declaredMethod = cls.getDeclaredMethod("setResources", ResourceSpec.class);
        declaredMethod.setAccessible(true);
        return declaredMethod;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1893260035:
                if (implMethodName.equals("lambda$testYieldingOperatorNotChainableToTaskChainedToLegacySource$e0defa2f$1")) {
                    z = 16;
                    break;
                }
                break;
            case -1792659149:
                if (implMethodName.equals("lambda$testExchangeModePipelined$4d2b82a6$1")) {
                    z = 13;
                    break;
                }
                break;
            case -1686966806:
                if (implMethodName.equals("lambda$createStreamGraphForSlotSharingTest$ce62fc71$1")) {
                    z = 14;
                    break;
                }
                break;
            case -1686966805:
                if (implMethodName.equals("lambda$createStreamGraphForSlotSharingTest$ce62fc71$2")) {
                    z = 15;
                    break;
                }
                break;
            case -1621000923:
                if (implMethodName.equals("lambda$testExchangeModeUndefined$4d2b82a6$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1606593980:
                if (implMethodName.equals("lambda$testIntermediateDataSetReuse$26af5f30$1")) {
                    z = 33;
                    break;
                }
                break;
            case -1101485073:
                if (implMethodName.equals("lambda$createJobGraphForManagedMemoryFractionTest$424b0425$1")) {
                    z = 31;
                    break;
                }
                break;
            case -1101426452:
                if (implMethodName.equals("lambda$createJobGraphForManagedMemoryFractionTest$424b0444$1")) {
                    z = 29;
                    break;
                }
                break;
            case -1101367831:
                if (implMethodName.equals("lambda$createJobGraphForManagedMemoryFractionTest$424b0463$1")) {
                    z = 27;
                    break;
                }
                break;
            case -997746729:
                if (implMethodName.equals("lambda$testYieldingOperatorProperlyChainedOnLegacySources$e0defa2f$1")) {
                    z = 12;
                    break;
                }
                break;
            case -997746728:
                if (implMethodName.equals("lambda$testYieldingOperatorProperlyChainedOnLegacySources$e0defa2f$2")) {
                    z = 19;
                    break;
                }
                break;
            case -997746727:
                if (implMethodName.equals("lambda$testYieldingOperatorProperlyChainedOnLegacySources$e0defa2f$3")) {
                    z = 18;
                    break;
                }
                break;
            case -842167942:
                if (implMethodName.equals("lambda$testYieldingOperatorProperlyChainedOnNewSources$e0defa2f$1")) {
                    z = 30;
                    break;
                }
                break;
            case -100805765:
                if (implMethodName.equals("lambda$testExchangeModeBatch$4d2b82a6$1")) {
                    z = 5;
                    break;
                }
                break;
            case -34694871:
                if (implMethodName.equals("lambda$testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup$e0defa2f$1")) {
                    z = 8;
                    break;
                }
                break;
            case 368083189:
                if (implMethodName.equals("lambda$testExchangeModeHybridSelective$4d2b82a6$1")) {
                    z = 4;
                    break;
                }
                break;
            case 674082445:
                if (implMethodName.equals("lambda$testDisablingBufferTimeoutWithPipelinedExchanges$e0defa2f$1")) {
                    z = 21;
                    break;
                }
                break;
            case 684973907:
                if (implMethodName.equals("lambda$testSlotSharingResourceConfiguration$e0defa2f$1")) {
                    z = false;
                    break;
                }
                break;
            case 684973908:
                if (implMethodName.equals("lambda$testSlotSharingResourceConfiguration$e0defa2f$2")) {
                    z = 7;
                    break;
                }
                break;
            case 729089612:
                if (implMethodName.equals("lambda$testPartitionTypesInBatchMode$3558be8e$1")) {
                    z = 32;
                    break;
                }
                break;
            case 743336388:
                if (implMethodName.equals("lambda$testCompatibleExchangeModeWithBufferTimeout$d2f1c1ca$1")) {
                    z = 9;
                    break;
                }
                break;
            case 815828205:
                if (implMethodName.equals("lambda$testYieldingOperatorChainableToTaskNotChainedToLegacySource$e0defa2f$1")) {
                    z = 11;
                    break;
                }
                break;
            case 859089798:
                if (implMethodName.equals("lambda$testCacheJobGraph$5c4176c$1")) {
                    z = 26;
                    break;
                }
                break;
            case 859089799:
                if (implMethodName.equals("lambda$testCacheJobGraph$5c4176c$2")) {
                    z = 28;
                    break;
                }
                break;
            case 1299039648:
                if (implMethodName.equals("lambda$testExchangeModeHybridFull$4d2b82a6$1")) {
                    z = 10;
                    break;
                }
                break;
            case 1463475771:
                if (implMethodName.equals("lambda$createSource$a01674f3$1")) {
                    z = true;
                    break;
                }
                break;
            case 1641998514:
                if (implMethodName.equals("lambda$testPartitionTypesInBatchMode$e0defa2f$1")) {
                    z = 24;
                    break;
                }
                break;
            case 1641998515:
                if (implMethodName.equals("lambda$testPartitionTypesInBatchMode$e0defa2f$2")) {
                    z = 25;
                    break;
                }
                break;
            case 1641998516:
                if (implMethodName.equals("lambda$testPartitionTypesInBatchMode$e0defa2f$3")) {
                    z = 22;
                    break;
                }
                break;
            case 1641998517:
                if (implMethodName.equals("lambda$testPartitionTypesInBatchMode$e0defa2f$4")) {
                    z = 23;
                    break;
                }
                break;
            case 1677173875:
                if (implMethodName.equals("lambda$createJobGraphWithDescription$73b57204$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1677830238:
                if (implMethodName.equals("lambda$createJobGraphWithDescription$73b571e5$1")) {
                    z = 20;
                    break;
                }
                break;
            case 1870433442:
                if (implMethodName.equals("lambda$testIteration$ac072c38$1")) {
                    z = 17;
                    break;
                }
                break;
            case 1924468850:
                if (implMethodName.equals("lambda$testIteration$79b5110$1")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(num.intValue() + 1);
                    };
                }
                break;
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num3 -> {
                        return num3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l2 -> {
                        return Long.valueOf(l2.longValue() + 2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num4 -> {
                        return num4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num5 -> {
                        return num5;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num6 -> {
                        return Integer.valueOf(num6.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num22 -> {
                        return Integer.valueOf(num22.intValue() * num22.intValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num7 -> {
                        return Integer.valueOf(num7.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num8 -> {
                        return num8;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num9 -> {
                        return num9;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num10 -> {
                        return num10;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num11 -> {
                        return num11;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num12 -> {
                        return num12;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num13 -> {
                        return num13;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num23 -> {
                        return num23;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num14 -> {
                        return num14;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Z")) {
                    return num24 -> {
                        return false;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num32 -> {
                        return num32;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num25 -> {
                        return num25;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return Long.valueOf(l.longValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num15 -> {
                        return num15;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num33 -> {
                        return num33;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num52 -> {
                        return num52;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num16 -> {
                        return num16;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num26 -> {
                        return num26;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num17 -> {
                        return Integer.valueOf(num17.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num34 -> {
                        return num34;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num27 -> {
                        return Integer.valueOf(num27.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num28 -> {
                        return num28;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l3 -> {
                        return l3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num18 -> {
                        return num18;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num42 -> {
                        return num42;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num19 -> {
                        return num19;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
