package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.class */
public class CacheTransformationTranslator<OUT, T extends CacheTransformation<OUT>> extends SimpleTransformationTranslator<OUT, T> {
    public static final String CACHE_CONSUMER_OPERATOR_NAME = "CacheRead";
    public static final String CACHE_PRODUCER_OPERATOR_NAME = "CacheWrite";

    /* loaded from: input_file:org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator$IdentityStreamOperator.class */
    public static class IdentityStreamOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 4517845269225218313L;

        @Override // org.apache.flink.streaming.api.operators.Input
        public void processElement(StreamRecord<T> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator$NoOpStreamOperator.class */
    public static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 4517845269225218313L;

        @Override // org.apache.flink.streaming.api.operators.Input
        public void processElement(StreamRecord<T> streamRecord) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.graph.SimpleTransformationTranslator
    public Collection<Integer> translateForBatchInternal(T t, TransformationTranslator.Context context) {
        if (t.isCached()) {
            return consumeCache(t, context);
        }
        List<Transformation<?>> inputs = t.getInputs();
        Preconditions.checkState(inputs.size() == 1, "There could be only one transformation input to cache");
        Transformation<?> transformation = inputs.get(0);
        if (transformation instanceof PhysicalTransformation) {
            return physicalTransformationProduceCache(t, context, transformation);
        }
        if (transformation instanceof SideOutputTransformation) {
            return sideOutputTransformationProduceCache(t, context, (SideOutputTransformation) transformation);
        }
        throw new RuntimeException(String.format("Unsupported transformation %s", transformation.getClass()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.graph.SimpleTransformationTranslator
    public Collection<Integer> translateForStreamingInternal(T t, TransformationTranslator.Context context) {
        if (t.isCached()) {
            return consumeCache(t, context);
        }
        List<Transformation<?>> inputs = t.getInputs();
        Preconditions.checkState(inputs.size() == 1, "There could be only one transformation input to cache");
        return context.getStreamNodeIds(inputs.get(0));
    }

    private Collection<Integer> sideOutputTransformationProduceCache(T t, TransformationTranslator.Context context, SideOutputTransformation<?> sideOutputTransformation) {
        StreamGraph streamGraph = context.getStreamGraph();
        Transformation<?> transformation = sideOutputTransformation.getInputs().get(0);
        Collection<Integer> streamNodeIds = context.getStreamNodeIds(transformation);
        Preconditions.checkState(streamNodeIds.size() == 1, "We expect only one stream node for the input transform");
        Integer next = streamNodeIds.iterator().next();
        addCacheProduceNode(streamGraph, t, context, transformation);
        int newNodeId = Transformation.getNewNodeId();
        streamGraph.addVirtualSideOutputNode(next, Integer.valueOf(newNodeId), sideOutputTransformation.getOutputTag());
        streamGraph.addEdge(Integer.valueOf(newNodeId), Integer.valueOf(t.getId()), 0, new IntermediateDataSetID(t.getDatasetId()));
        return Collections.singletonList(Integer.valueOf(newNodeId));
    }

    private List<Integer> physicalTransformationProduceCache(T t, TransformationTranslator.Context context, Transformation<?> transformation) {
        StreamGraph streamGraph = context.getStreamGraph();
        Collection<Integer> streamNodeIds = context.getStreamNodeIds(transformation);
        Preconditions.checkState(streamNodeIds.size() == 1, "We expect only one stream node for the input transform");
        Integer next = streamNodeIds.iterator().next();
        addCacheProduceNode(streamGraph, t, context, transformation);
        streamGraph.addEdge(next, Integer.valueOf(t.getId()), 0, new IntermediateDataSetID(t.getDatasetId()));
        return Collections.singletonList(next);
    }

    private void addCacheProduceNode(StreamGraph streamGraph, T t, TransformationTranslator.Context context, Transformation<?> transformation) {
        SimpleOperatorFactory of = SimpleOperatorFactory.of(new NoOpStreamOperator());
        of.setChainingStrategy(ChainingStrategy.HEAD);
        streamGraph.addOperator(Integer.valueOf(t.getId()), context.getSlotSharingGroup(), t.getCoLocationGroupKey(), of, t.getInputs().get(0).getOutputType(), null, CACHE_PRODUCER_OPERATOR_NAME);
        streamGraph.setParallelism(Integer.valueOf(t.getId()), transformation.getParallelism(), transformation.isParallelismConfigured());
        streamGraph.setMaxParallelism(t.getId(), transformation.getMaxParallelism());
    }

    private List<Integer> consumeCache(T t, TransformationTranslator.Context context) {
        StreamGraph streamGraph = context.getStreamGraph();
        SimpleOperatorFactory of = SimpleOperatorFactory.of(new IdentityStreamOperator());
        TypeInformation<OUT> outputType = t.getTransformationToCache().getOutputType();
        streamGraph.addLegacySource(Integer.valueOf(t.getId()), context.getSlotSharingGroup(), t.getCoLocationGroupKey(), of, outputType, outputType, CACHE_CONSUMER_OPERATOR_NAME);
        streamGraph.setParallelism(Integer.valueOf(t.getId()), t.getTransformationToCache().getParallelism(), t.isParallelismConfigured());
        streamGraph.setMaxParallelism(t.getId(), t.getTransformationToCache().getMaxParallelism());
        streamGraph.getStreamNode(Integer.valueOf(t.getId())).setConsumeClusterDatasetId(new IntermediateDataSetID(t.getDatasetId()));
        return Collections.singletonList(Integer.valueOf(t.getId()));
    }
}
