/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Map;
import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
import org.apache.beam.runners.spark.translation.BoundedDataset;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.translation.MultiDoFnFunction;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

final class StreamingTransformTranslator {
    private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps.newHashMap();

    private StreamingTransformTranslator() {
    }

    private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
        return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>(){

            @Override
            public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
                JavaDStream dstream = ((UnboundedDataset)context.borrowDataset(transform)).getDStream();
                dstream.map(WindowingHelpers.unwindowFunction()).print(transform.getNum());
            }
        };
    }

    private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
        return new TransformEvaluator<Read.Unbounded<T>>(){

            @Override
            public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(SparkUnboundedSource.read(context.getStreamingContext(), context.getRuntimeContext(), transform.getSource())));
            }
        };
    }

    private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
        return new TransformEvaluator<CreateStream.QueuedValues<T>>(){

            @Override
            public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
                Iterable values = transform.getQueuedValues();
                Coder coder = ((PCollection)context.getOutput(transform)).getCoder();
                context.putUnboundedDatasetFromQueue(transform, values, coder);
            }
        };
    }

    private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
        return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>(){

            @Override
            public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
                PCollectionList pcs = (PCollectionList)context.getInput(transform);
                final ArrayList rdds = new ArrayList();
                ArrayList dStreams = new ArrayList();
                for (PCollection pcol : pcs.getAll()) {
                    Dataset dataset = context.borrowDataset((PValue)pcol);
                    if (dataset instanceof UnboundedDataset) {
                        dStreams.add(((UnboundedDataset)dataset).getDStream());
                        continue;
                    }
                    rdds.add(((BoundedDataset)dataset).getRDD());
                }
                JavaDStream unifiedStreams = context.getStreamingContext().union((JavaDStream)dStreams.remove(0), dStreams);
                if (rdds.size() > 0) {
                    JavaDStream joined = unifiedStreams.transform(new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>(){

                        public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> streamRdd) throws Exception {
                            return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
                        }
                    });
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(joined));
                } else {
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(unifiedStreams));
                }
            }
        };
    }

    private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
        return new TransformEvaluator<Window.Bound<T>>(){

            @Override
            public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
                Duration slideDuration;
                Duration windowDuration;
                WindowFn windowFn = transform.getWindowFn();
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                if (windowFn instanceof FixedWindows) {
                    slideDuration = windowDuration = Durations.milliseconds((long)((FixedWindows)windowFn).getSize().getMillis());
                } else if (windowFn instanceof SlidingWindows) {
                    SlidingWindows slidingWindows = (SlidingWindows)windowFn;
                    windowDuration = Durations.milliseconds((long)slidingWindows.getSize().getMillis());
                    slideDuration = Durations.milliseconds((long)slidingWindows.getPeriod().getMillis());
                } else {
                    throw new UnsupportedOperationException(String.format("WindowFn %s is not supported.", windowFn.getClass().getCanonicalName()));
                }
                JavaDStream windowedDStream = dStream.window(windowDuration, slideDuration);
                if (TranslationUtils.skipAssignWindows(transform, context)) {
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(windowedDStream));
                } else {
                    AssignWindowsDoFn addWindowsDoFn = new AssignWindowsDoFn(windowFn);
                    SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                    JavaDStream outStream = windowedDStream.transform(new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>((OldDoFn)addWindowsDoFn, runtimeContext){
                        final /* synthetic */ OldDoFn val$addWindowsDoFn;
                        final /* synthetic */ SparkRuntimeContext val$runtimeContext;
                        {
                            this.val$addWindowsDoFn = oldDoFn;
                            this.val$runtimeContext = sparkRuntimeContext;
                        }

                        public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
                            Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
                            return rdd.mapPartitions(new DoFnFunction(accum, this.val$addWindowsDoFn, this.val$runtimeContext, null, null));
                        }
                    });
                    context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(outStream));
                }
            }
        };
    }

    private static <K, V> TransformEvaluator<GroupByKey<K, V>> groupByKey() {
        return new TransformEvaluator<GroupByKey<K, V>>(){

            @Override
            public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                final KvCoder coder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final WindowingStrategy windowingStrategy = ((PCollection)context.getInput(transform)).getWindowingStrategy();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>, JavaRDD<WindowedValue<KV<K, Iterable<V>>>>>(){

                    public JavaRDD<WindowedValue<KV<K, Iterable<V>>>> call(JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception {
                        Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
                        return GroupCombineFunctions.groupByKey(rdd, accum, coder, runtimeContext, windowingStrategy);
                    }
                });
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(outStream));
            }
        };
    }

    private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() {
        return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                CombineWithContext.KeyedCombineFnWithContext fn = CombineFnUtil.toFnWithContext((CombineFnBase.PerKeyCombineFn)transform.getFn());
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                SparkKeyedCombineFn combineFnWithContext = new SparkKeyedCombineFn(fn, context.getRuntimeContext(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), windowingStrategy);
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(dStream.map(new TranslationUtils.CombineGroupedValues(combineFnWithContext))));
            }
        };
    }

    private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>> combineGlobally() {
        return new TransformEvaluator<Combine.Globally<InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                final Coder iCoder = ((PCollection)context.getInput(transform)).getCoder();
                final Coder oCoder = ((PCollection)context.getOutput(transform)).getCoder();
                final CombineWithContext.CombineFnWithContext combineFn = CombineFnUtil.toFnWithContext((CombineFnBase.GlobalCombineFn)transform.getFn());
                final WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                final boolean hasDefault = transform.isInsertDefault();
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>(){

                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
                        return GroupCombineFunctions.combineGlobally(rdd, combineFn, iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault);
                    }
                });
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(outStream));
            }
        };
    }

    private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
        return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>(){

            @Override
            public void evaluate(Combine.PerKey<K, InputT, OutputT> transform, EvaluationContext context) {
                PCollection input = (PCollection)context.getInput(transform);
                final KvCoder inputCoder = (KvCoder)((PCollection)context.getInput(transform)).getCoder();
                final CombineWithContext.KeyedCombineFnWithContext combineFn = CombineFnUtil.toFnWithContext((CombineFnBase.PerKeyCombineFn)transform.getFn());
                final WindowingStrategy windowingStrategy = input.getWindowingStrategy();
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>, JavaRDD<WindowedValue<KV<K, OutputT>>>>(){

                    public JavaRDD<WindowedValue<KV<K, OutputT>>> call(JavaRDD<WindowedValue<KV<K, InputT>>> rdd) throws Exception {
                        return GroupCombineFunctions.combinePerKey(rdd, combineFn, inputCoder, runtimeContext, windowingStrategy, sideInputs);
                    }
                });
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(outStream));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
        return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>(){

            @Override
            public void evaluate(final ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
                DoFn doFn = transform.getNewFn();
                TranslationUtils.rejectStateAndTimers(doFn);
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                final WindowFn windowFn = ((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn();
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                JavaDStream outStream = dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>(){

                    public JavaRDD<WindowedValue<OutputT>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
                        Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
                        return rdd.mapPartitions(new DoFnFunction(accum, transform.getFn(), runtimeContext, sideInputs, windowFn));
                    }
                });
                context.putDataset((PTransform<?, ?>)transform, (Dataset)new UnboundedDataset(outStream));
            }
        };
    }

    private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
        return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>(){

            @Override
            public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
                DoFn doFn = transform.getNewFn();
                TranslationUtils.rejectStateAndTimers(doFn);
                final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
                final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs = TranslationUtils.getSideInputs(transform.getSideInputs(), context);
                final WindowFn windowFn = ((PCollection)context.getInput(transform)).getWindowingStrategy().getWindowFn();
                JavaDStream dStream = ((UnboundedDataset)context.borrowDataset((PTransform<?, ?>)transform)).getDStream();
                JavaPairDStream all = dStream.transformToPair(new Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>(){

                    public JavaPairRDD<TupleTag<?>, WindowedValue<?>> call(JavaRDD<WindowedValue<InputT>> rdd) throws Exception {
                        Accumulator<NamedAggregators> accum = SparkAggregators.getNamedAggregators(new JavaSparkContext(rdd.context()));
                        return rdd.mapPartitionsToPair(new MultiDoFnFunction(accum, transform.getFn(), runtimeContext, transform.getMainOutputTag(), sideInputs, windowFn));
                    }
                }).cache();
                PCollectionTuple pct = (PCollectionTuple)context.getOutput(transform);
                for (Map.Entry e : pct.getAll().entrySet()) {
                    JavaPairDStream filtered = all.filter(new TranslationUtils.TupleTagFilter((TupleTag)e.getKey()));
                    JavaDStream values = TranslationUtils.dStreamValues(filtered);
                    context.putDataset((PValue)e.getValue(), new UnboundedDataset(values));
                }
            }
        };
    }

    static {
        EVALUATORS.put(Read.Unbounded.class, StreamingTransformTranslator.readUnbounded());
        EVALUATORS.put(GroupByKey.class, StreamingTransformTranslator.groupByKey());
        EVALUATORS.put(Combine.GroupedValues.class, StreamingTransformTranslator.combineGrouped());
        EVALUATORS.put(Combine.Globally.class, StreamingTransformTranslator.combineGlobally());
        EVALUATORS.put(Combine.PerKey.class, StreamingTransformTranslator.combinePerKey());
        EVALUATORS.put(ParDo.Bound.class, StreamingTransformTranslator.parDo());
        EVALUATORS.put(ParDo.BoundMulti.class, StreamingTransformTranslator.multiDo());
        EVALUATORS.put(ConsoleIO.Write.Unbound.class, StreamingTransformTranslator.print());
        EVALUATORS.put(CreateStream.QueuedValues.class, StreamingTransformTranslator.createFromQueue());
        EVALUATORS.put(Window.Bound.class, StreamingTransformTranslator.window());
        EVALUATORS.put(Flatten.FlattenPCollectionList.class, StreamingTransformTranslator.flattenPColl());
    }

    public static class Translator
    implements SparkPipelineTranslator {
        private final SparkPipelineTranslator batchTranslator;

        Translator(SparkPipelineTranslator batchTranslator) {
            this.batchTranslator = batchTranslator;
        }

        @Override
        public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
            return EVALUATORS.containsKey(clazz) || this.batchTranslator.hasTranslation(clazz);
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateBounded(Class<TransformT> clazz) {
            TransformEvaluator<TransformT> transformEvaluator = this.batchTranslator.translateBounded(clazz);
            Preconditions.checkState((transformEvaluator != null ? 1 : 0) != 0, (String)"No TransformEvaluator registered for BOUNDED transform %s", (Object[])new Object[]{clazz});
            return transformEvaluator;
        }

        @Override
        public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translateUnbounded(Class<TransformT> clazz) {
            TransformEvaluator transformEvaluator = (TransformEvaluator)EVALUATORS.get(clazz);
            Preconditions.checkState((transformEvaluator != null ? 1 : 0) != 0, (String)"No TransformEvaluator registered for UNBOUNDED transform %s", (Object[])new Object[]{clazz});
            return transformEvaluator;
        }
    }
}

