package org.apache.beam.runners.spark.translation.streaming;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Maps;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPCollectionView;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.class */
public final class StreamingTransformTranslator {
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator$Translator.class */
    public static class Translator implements SparkPipelineTranslator {
        private final SparkPipelineTranslator batchTranslator;

        public Translator(SparkPipelineTranslator sparkPipelineTranslator) {
            this.batchTranslator = sparkPipelineTranslator;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> cls) {
            return StreamingTransformTranslator.EVALUATORS.containsKey(cls);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(Class<TransformT> cls) {
            TransformEvaluator<TransformT> translateBounded = this.batchTranslator.translateBounded(cls);
            Preconditions.checkState(translateBounded != null, "No TransformEvaluator registered for BOUNDED transform %s", cls);
            return translateBounded;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(Class<TransformT> cls) {
            TransformEvaluator<TransformT> transformEvaluator = (TransformEvaluator) StreamingTransformTranslator.EVALUATORS.get(cls);
            Preconditions.checkState(transformEvaluator != null, "No TransformEvaluator registered for UNBOUNDED transform %s", cls);
            return transformEvaluator;
        }
    }

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.1
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(ConsoleIO.Write.Unbound<T> unbound, EvaluationContext evaluationContext) {
                ((UnboundedDataset) evaluationContext.borrowDataset(unbound)).getDStream().map(WindowingHelpers.unwindowFunction()).print(unbound.getNum());
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return ".print(...)";
            }
        };
    }

    private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
        return new TransformEvaluator<Read.Unbounded<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.2
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Read.Unbounded<T> unbounded, EvaluationContext evaluationContext) {
                evaluationContext.putDataset(unbounded, SparkUnboundedSource.read(evaluationContext.getStreamingContext(), evaluationContext.getSerializableOptions(), unbounded.getSource(), evaluationContext.getCurrentTransform().getFullName()));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "streamingContext.<readFrom(<source>)>()";
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.3
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(CreateStream<T> createStream, EvaluationContext evaluationContext) {
                JavaInputDStream<WindowedValue<T>> buildInputStream = buildInputStream(buildRdds(createStream.getBatches(), evaluationContext.getStreamingContext(), evaluationContext.getOutput(createStream).getCoder()), createStream, evaluationContext);
                UnboundedDataset unboundedDataset = new UnboundedDataset(buildInputStream, Collections.singletonList(Integer.valueOf(buildInputStream.inputDStream().id())));
                GlobalWatermarkHolder.addAll(ImmutableMap.of(unboundedDataset.getStreamSources().get(0), createStream.getTimes()));
                evaluationContext.putDataset(createStream, unboundedDataset);
            }

            private Queue<JavaRDD<WindowedValue<T>>> buildRdds(Queue<Iterable<TimestampedValue<T>>> queue, JavaStreamingContext javaStreamingContext, Coder<T> coder) {
                WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                Iterator<Iterable<TimestampedValue<T>>> it = queue.iterator();
                while (it.hasNext()) {
                    linkedBlockingQueue.offer(javaStreamingContext.sparkContext().parallelize(CoderHelpers.toByteArrays(Iterables.transform(it.next(), new Function<TimestampedValue<T>, WindowedValue<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.3.1
                        @Override // org.apache.beam.runners.spark.repackaged.com.google.common.base.Function
                        public WindowedValue<T> apply(@Nonnull TimestampedValue<T> timestampedValue) {
                            return WindowedValue.of(timestampedValue.getValue(), timestampedValue.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
                        }
                    }), of)).map(CoderHelpers.fromByteFunction(of)));
                }
                return linkedBlockingQueue;
            }

            private JavaInputDStream<WindowedValue<T>> buildInputStream(Queue<JavaRDD<WindowedValue<T>>> queue, CreateStream<T> createStream, EvaluationContext evaluationContext) {
                return createStream.isForceWatermarkSync() ? new JavaInputDStream<>(new WatermarkSyncedDStream(queue, Long.valueOf(createStream.getBatchDuration()), evaluationContext.getStreamingContext().ssc()), JavaSparkContext$.MODULE$.fakeClassTag()) : evaluationContext.getStreamingContext().queueStream(queue, true);
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "streamingContext.queueStream(...)";
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.PCollections<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.4
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Flatten.PCollections<T> pCollections, EvaluationContext evaluationContext) {
                Map<TupleTag<?>, PValue> inputs = evaluationContext.getInputs(pCollections);
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                Iterator<PValue> it = inputs.values().iterator();
                while (it.hasNext()) {
                    PCollection pCollection = (PValue) it.next();
                    Preconditions.checkArgument(pCollection instanceof PCollection, "Flatten had non-PCollection value in input: %s of type %s", pCollection, pCollection.getClass().getSimpleName());
                    Dataset borrowDataset = evaluationContext.borrowDataset((PValue) pCollection);
                    if (borrowDataset instanceof UnboundedDataset) {
                        UnboundedDataset unboundedDataset = (UnboundedDataset) borrowDataset;
                        arrayList2.addAll(unboundedDataset.getStreamSources());
                        arrayList.add(unboundedDataset.getDStream());
                    } else {
                        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                        linkedBlockingQueue.offer(((BoundedDataset) borrowDataset).getRDD());
                        arrayList.add(evaluationContext.getStreamingContext().queueStream(linkedBlockingQueue));
                    }
                }
                evaluationContext.putDataset(pCollections, new UnboundedDataset(evaluationContext.getStreamingContext().union((JavaDStream) arrayList.remove(0), arrayList), arrayList2));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "streamingContext.union(...)";
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>> window() {
        return new TransformEvaluator<Window.Assign<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.5
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Window.Assign<T> assign, EvaluationContext evaluationContext) {
                UnboundedDataset unboundedDataset = (UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) assign);
                JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream();
                evaluationContext.putDataset(assign, new UnboundedDataset(TranslationUtils.skipAssignWindows(assign, evaluationContext) ? dStream : dStream.transform(new org.apache.spark.api.java.function.Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.5.1
                    public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> javaRDD) throws Exception {
                        return javaRDD.map(new SparkAssignWindowFn(assign.getWindowFn()));
                    }
                }), unboundedDataset.getStreamSources()));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "map(new <windowFn>())";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.6
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(GroupByKey<K, V> groupByKey, EvaluationContext evaluationContext) {
                UnboundedDataset unboundedDataset = (UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupByKey);
                List<Integer> streamSources = unboundedDataset.getStreamSources();
                JavaDStream dStream = unboundedDataset.getDStream();
                final KvCoder coder = evaluationContext.getInput(groupByKey).getCoder();
                WindowingStrategy windowingStrategy = evaluationContext.getInput(groupByKey).getWindowingStrategy();
                final WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder());
                evaluationContext.putDataset(groupByKey, new UnboundedDataset(SparkGroupAlsoByWindowViaWindowSet.groupAlsoByWindow(dStream.transform(new org.apache.spark.api.java.function.Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.6.1
                    public JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> call(JavaRDD<WindowedValue<KV<K, V>>> javaRDD) throws Exception {
                        return GroupCombineFunctions.groupByKeyOnly(javaRDD, coder.getKeyCoder(), of);
                    }
                }), coder.getKeyCoder(), of, windowingStrategy, evaluationContext.getSerializableOptions(), streamSources, evaluationContext.getCurrentTransform().getFullName()), streamSources));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "groupByKey()";
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.7
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> groupedValues, EvaluationContext evaluationContext) {
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(groupedValues).getWindowingStrategy();
                final CombineWithContext.CombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(groupedValues.getFn());
                UnboundedDataset unboundedDataset = (UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupedValues);
                JavaDStream dStream = unboundedDataset.getDStream();
                final SerializablePipelineOptions serializableOptions = evaluationContext.getSerializableOptions();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                evaluationContext.putDataset(groupedValues, new UnboundedDataset(dStream.transform(new org.apache.spark.api.java.function.Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>, JavaRDD<WindowedValue<KV<K, OutputT>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.7.1
                    public JavaRDD<WindowedValue<KV<K, OutputT>>> call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> javaRDD) throws Exception {
                        return javaRDD.map(new TranslationUtils.CombineGroupedValues(new SparkKeyedCombineFn(fnWithContext, serializableOptions, TranslationUtils.getSideInputs(groupedValues.getSideInputs(), new JavaSparkContext(javaRDD.context()), pViews), windowingStrategy)));
                    }
                }), unboundedDataset.getStreamSources()));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "map(new <fn>())";
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.8
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final ParDo.MultiOutput<InputT, OutputT> multiOutput, EvaluationContext evaluationContext) {
                final DoFn fn = multiOutput.getFn();
                TranslationUtils.rejectSplittable(fn);
                TranslationUtils.rejectStateAndTimers(fn);
                final SerializablePipelineOptions serializableOptions = evaluationContext.getSerializableOptions();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(multiOutput).getWindowingStrategy();
                UnboundedDataset unboundedDataset = (UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) multiOutput);
                JavaDStream dStream = unboundedDataset.getDStream();
                final String fullName = evaluationContext.getCurrentTransform().getFullName();
                JavaPairDStream transformToPair = dStream.transformToPair(new org.apache.spark.api.java.function.Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.8.1
                    public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(JavaRDD<WindowedValue<InputT>> javaRDD) throws Exception {
                        return javaRDD.mapPartitionsToPair(new MultiDoFnFunction(MetricsAccumulator.getInstance(), fullName, fn, serializableOptions, multiOutput.getMainOutputTag(), multiOutput.getAdditionalOutputTags().getAll(), TranslationUtils.getSideInputs(multiOutput.getSideInputs(), JavaSparkContext.fromSparkContext(javaRDD.context()), pViews), windowingStrategy, false));
                    }
                });
                Map<TupleTag<?>, PValue> outputs = evaluationContext.getOutputs(multiOutput);
                if (outputs.size() > 1) {
                    Map<TupleTag<?>, Coder<WindowedValue<?>>> tupleTagCoders = TranslationUtils.getTupleTagCoders(outputs);
                    transformToPair = transformToPair.mapToPair(TranslationUtils.getTupleTagEncodeFunction(tupleTagCoders)).cache().mapToPair(TranslationUtils.getTupleTagDecodeFunction(tupleTagCoders));
                }
                for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
                    evaluationContext.putDataset(entry.getValue(), (Dataset) new UnboundedDataset(TranslationUtils.dStreamValues(transformToPair.filter(new TranslationUtils.TupleTagFilter(entry.getKey()))), unboundedDataset.getStreamSources()), false);
                }
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "mapPartitions(new <fn>())";
            }
        };
    }

    private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K, V>> reshuffle() {
        return new TransformEvaluator<Reshuffle<K, V>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.9
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Reshuffle<K, V> reshuffle, EvaluationContext evaluationContext) {
                UnboundedDataset unboundedDataset = (UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) reshuffle);
                List<Integer> streamSources = unboundedDataset.getStreamSources();
                JavaDStream dStream = unboundedDataset.getDStream();
                final KvCoder coder = evaluationContext.getInput(reshuffle).getCoder();
                final WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), evaluationContext.getInput(reshuffle).getWindowingStrategy().getWindowFn().windowCoder());
                evaluationContext.putDataset(reshuffle, new UnboundedDataset(dStream.transform(new org.apache.spark.api.java.function.Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, V>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.9.1
                    public JavaRDD<WindowedValue<KV<K, V>>> call(JavaRDD<WindowedValue<KV<K, V>>> javaRDD) throws Exception {
                        return GroupCombineFunctions.reshuffle(javaRDD, coder.getKeyCoder(), of);
                    }
                }), streamSources));
            }

            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public String toNativeString() {
                return "repartition(...)";
            }
        };
    }

    static {
        EVALUATORS.put(Read.Unbounded.class, readUnbounded());
        EVALUATORS.put(GroupByKey.class, groupByKey());
        EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
        EVALUATORS.put(ParDo.MultiOutput.class, parDo());
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
        EVALUATORS.put(CreateStream.class, createFromQueue());
        EVALUATORS.put(Window.Assign.class, window());
        EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
        EVALUATORS.put(Reshuffle.class, reshuffle());
    }
}
