package org.apache.beam.runners.spark.translation.streaming;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkAssignWindowFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPCollectionView;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.spark.repackaged.com.google.common.collect.Maps;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.class */
final class StreamingTransformTranslator {
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator$Translator.class */
    public static class Translator implements SparkPipelineTranslator {
        private final SparkPipelineTranslator batchTranslator;

        /* JADX INFO: Access modifiers changed from: package-private */
        public Translator(SparkPipelineTranslator sparkPipelineTranslator) {
            this.batchTranslator = sparkPipelineTranslator;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> cls) {
            return StreamingTransformTranslator.EVALUATORS.containsKey(cls) || this.batchTranslator.hasTranslation(cls);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(Class<TransformT> cls) {
            TransformEvaluator<TransformT> translateBounded = this.batchTranslator.translateBounded(cls);
            Preconditions.checkState(translateBounded != null, "No TransformEvaluator registered for BOUNDED transform %s", cls);
            return translateBounded;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkPipelineTranslator
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(Class<TransformT> cls) {
            TransformEvaluator<TransformT> transformEvaluator = (TransformEvaluator) StreamingTransformTranslator.EVALUATORS.get(cls);
            Preconditions.checkState(transformEvaluator != null, "No TransformEvaluator registered for UNBOUNDED transform %s", cls);
            return transformEvaluator;
        }
    }

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.1
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(ConsoleIO.Write.Unbound<T> unbound, EvaluationContext evaluationContext) {
                ((UnboundedDataset) evaluationContext.borrowDataset(unbound)).getDStream().map(WindowingHelpers.unwindowFunction()).print(unbound.getNum());
            }
        };
    }

    private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
        return new TransformEvaluator<Read.Unbounded<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.2
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Read.Unbounded<T> unbounded, EvaluationContext evaluationContext) {
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) unbounded, (Dataset) new UnboundedDataset(SparkUnboundedSource.read(evaluationContext.getStreamingContext(), evaluationContext.getRuntimeContext(), unbounded.getSource())));
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream.QueuedValues<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.3
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(CreateStream.QueuedValues<T> queuedValues, EvaluationContext evaluationContext) {
                evaluationContext.putUnboundedDatasetFromQueue(queuedValues, queuedValues.getQueuedValues(), evaluationContext.getOutput(queuedValues).getCoder());
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.4
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(Flatten.FlattenPCollectionList<T> flattenPCollectionList, EvaluationContext evaluationContext) {
                List<TaggedPValue> inputs = evaluationContext.getInputs(flattenPCollectionList);
                final ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (TaggedPValue taggedPValue : inputs) {
                    Preconditions.checkArgument(taggedPValue.getValue() instanceof PCollection, "Flatten had non-PCollection value in input: %s of type %s", taggedPValue.getValue(), taggedPValue.getValue().getClass().getSimpleName());
                    Dataset borrowDataset = evaluationContext.borrowDataset((PValue) taggedPValue.getValue());
                    if (borrowDataset instanceof UnboundedDataset) {
                        arrayList2.add(((UnboundedDataset) borrowDataset).getDStream());
                    } else {
                        arrayList.add(((BoundedDataset) borrowDataset).getRDD());
                    }
                }
                JavaDStream union = evaluationContext.getStreamingContext().union((JavaDStream) arrayList2.remove(0), arrayList2);
                if (arrayList.size() > 0) {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) flattenPCollectionList, (Dataset) new UnboundedDataset(union.transform(new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.4.1
                        public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> javaRDD) throws Exception {
                            return new JavaSparkContext(javaRDD.context()).union(javaRDD, arrayList);
                        }
                    })));
                } else {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) flattenPCollectionList, (Dataset) new UnboundedDataset(union));
                }
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.5
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Window.Bound<T> bound, EvaluationContext evaluationContext) {
                Duration milliseconds;
                Duration milliseconds2;
                FixedWindows windowFn = bound.getWindowFn();
                JavaDStream<WindowedValue<T>> dStream = ((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) bound)).getDStream();
                if (windowFn instanceof FixedWindows) {
                    milliseconds = Durations.milliseconds(windowFn.getSize().getMillis());
                    milliseconds2 = milliseconds;
                } else {
                    if (!(windowFn instanceof SlidingWindows)) {
                        throw new UnsupportedOperationException(String.format("WindowFn %s is not supported.", windowFn.getClass().getCanonicalName()));
                    }
                    SlidingWindows slidingWindows = (SlidingWindows) windowFn;
                    milliseconds = Durations.milliseconds(slidingWindows.getSize().getMillis());
                    milliseconds2 = Durations.milliseconds(slidingWindows.getPeriod().getMillis());
                }
                JavaDStream window = dStream.window(milliseconds, milliseconds2);
                if (TranslationUtils.skipAssignWindows(bound, evaluationContext)) {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) bound, (Dataset) new UnboundedDataset(window));
                } else {
                    evaluationContext.putDataset((PTransform<?, ? extends PValue>) bound, (Dataset) new UnboundedDataset(window.transform(new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.5.1
                        public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> javaRDD) throws Exception {
                            return javaRDD.map(new SparkAssignWindowFn(bound.getWindowFn()));
                        }
                    })));
                }
            }
        };
    }

    private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.6
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(GroupByKey<K, V> groupByKey, EvaluationContext evaluationContext) {
                JavaDStream dStream = ((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupByKey)).getDStream();
                final KvCoder coder = evaluationContext.getInput(groupByKey).getCoder();
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(groupByKey).getWindowingStrategy();
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) groupByKey, (Dataset) new UnboundedDataset(dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.6.1
                    public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K, V>>> javaRDD) throws Exception {
                        return GroupCombineFunctions.groupByKey(javaRDD, SparkAggregators.getNamedAggregators(new JavaSparkContext(javaRDD.context())), coder, runtimeContext, windowingStrategy);
                    }
                })));
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.7
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> groupedValues, EvaluationContext evaluationContext) {
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(groupedValues).getWindowingStrategy();
                final CombineWithContext.KeyedCombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(groupedValues.getFn());
                JavaDStream dStream = ((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) groupedValues)).getDStream();
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) groupedValues, (Dataset) new UnboundedDataset(dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>>, JavaRDD<WindowedValue<KV<K, OutputT>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.7.1
                    public JavaRDD<WindowedValue<KV<K, OutputT>>> call(JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> javaRDD) throws Exception {
                        return javaRDD.map(new TranslationUtils.CombineGroupedValues(new SparkKeyedCombineFn(fnWithContext, runtimeContext, TranslationUtils.getSideInputs(groupedValues.getSideInputs(), new JavaSparkContext(javaRDD.context()), pViews), windowingStrategy)));
                    }
                })));
            }
        };
    }

    private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>> combineGlobally() {
        return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.8
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Combine.Globally<InputT, OutputT> globally, EvaluationContext evaluationContext) {
                PCollection input = evaluationContext.getInput(globally);
                final Coder coder = evaluationContext.getInput(globally).getCoder();
                final Coder coder2 = evaluationContext.getOutput(globally).getCoder();
                final CombineWithContext.CombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(globally.getFn());
                final WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final boolean isInsertDefault = globally.isInsertDefault();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) globally, (Dataset) new UnboundedDataset(((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) globally)).getDStream().transform(new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.8.1
                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> javaRDD) throws Exception {
                        return GroupCombineFunctions.combineGlobally(javaRDD, fnWithContext, coder, coder2, runtimeContext, windowingStrategy, TranslationUtils.getSideInputs(globally.getSideInputs(), JavaSparkContext.fromSparkContext(javaRDD.context()), pViews), isInsertDefault);
                    }
                })));
            }
        };
    }

    private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
        return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.9
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final Combine.PerKey<K, InputT, OutputT> perKey, EvaluationContext evaluationContext) {
                PCollection input = evaluationContext.getInput(perKey);
                final KvCoder coder = evaluationContext.getInput(perKey).getCoder();
                final CombineWithContext.KeyedCombineFnWithContext fnWithContext = CombineFnUtil.toFnWithContext(perKey.getFn());
                final WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) perKey, (Dataset) new UnboundedDataset(((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) perKey)).getDStream().transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>, JavaRDD<WindowedValue<KV<K, OutputT>>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.9.1
                    public JavaRDD<WindowedValue<KV<K, OutputT>>> call(JavaRDD<WindowedValue<KV<K, InputT>>> javaRDD) throws Exception {
                        return GroupCombineFunctions.combinePerKey(javaRDD, fnWithContext, coder, runtimeContext, windowingStrategy, TranslationUtils.getSideInputs(perKey.getSideInputs(), JavaSparkContext.fromSparkContext(javaRDD.context()), pViews));
                    }
                })));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.10
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final ParDo.Bound<InputT, OutputT> bound, EvaluationContext evaluationContext) {
                final DoFn fn = bound.getFn();
                TranslationUtils.rejectStateAndTimers(fn);
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(bound).getWindowingStrategy();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                evaluationContext.putDataset((PTransform<?, ? extends PValue>) bound, (Dataset) new UnboundedDataset(((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) bound)).getDStream().transform(new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.10.1
                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> javaRDD) throws Exception {
                        JavaSparkContext javaSparkContext = new JavaSparkContext(javaRDD.context());
                        return javaRDD.mapPartitions(new DoFnFunction(SparkAggregators.getNamedAggregators(javaSparkContext), fn, runtimeContext, TranslationUtils.getSideInputs(bound.getSideInputs(), javaSparkContext, pViews), windowingStrategy));
                    }
                })));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
        return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.11
            @Override // org.apache.beam.runners.spark.translation.TransformEvaluator
            public void evaluate(final ParDo.BoundMulti<InputT, OutputT> boundMulti, EvaluationContext evaluationContext) {
                final DoFn fn = boundMulti.getFn();
                TranslationUtils.rejectStateAndTimers(fn);
                final SparkRuntimeContext runtimeContext = evaluationContext.getRuntimeContext();
                final SparkPCollectionView pViews = evaluationContext.getPViews();
                final WindowingStrategy windowingStrategy = evaluationContext.getInput(boundMulti).getWindowingStrategy();
                JavaPairDStream cache = ((UnboundedDataset) evaluationContext.borrowDataset((PTransform<? extends PValue, ?>) boundMulti)).getDStream().transformToPair(new Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator.11.1
                    public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(JavaRDD<WindowedValue<InputT>> javaRDD) throws Exception {
                        return javaRDD.mapPartitionsToPair(new MultiDoFnFunction(SparkAggregators.getNamedAggregators(new JavaSparkContext(javaRDD.context())), fn, runtimeContext, boundMulti.getMainOutputTag(), TranslationUtils.getSideInputs(boundMulti.getSideInputs(), JavaSparkContext.fromSparkContext(javaRDD.context()), pViews), windowingStrategy));
                    }
                }).cache();
                for (TaggedPValue taggedPValue : evaluationContext.getOutputs(boundMulti)) {
                    evaluationContext.putDataset(taggedPValue.getValue(), new UnboundedDataset(TranslationUtils.dStreamValues(cache.filter(new TranslationUtils.TupleTagFilter(taggedPValue.getTag())))));
                }
            }
        };
    }

    static {
        EVALUATORS.put(Read.Unbounded.class, readUnbounded());
        EVALUATORS.put(GroupByKey.class, groupByKey());
        EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
        EVALUATORS.put(Combine.Globally.class, combineGlobally());
        EVALUATORS.put(Combine.PerKey.class, combinePerKey());
        EVALUATORS.put(ParDo.Bound.class, parDo());
        EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
        EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
        EVALUATORS.put(Window.Bound.class, window());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
    }
}
