/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.ReifyTimestampsAndWindowsFunction;
import org.apache.beam.runners.spark.translation.SparkGlobalCombineFn;
import org.apache.beam.runners.spark.translation.SparkKeyedCombineFn;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

public class GroupCombineFunctions {
    public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKey(JavaRDD<WindowedValue<KV<K, V>>> rdd, Accumulator<NamedAggregators> accum, KvCoder<K, V> coder, SparkRuntimeContext runtimeContext, WindowingStrategy<?, W> windowingStrategy) {
        Coder keyCoder = coder.getKeyCoder();
        Coder valueCoder = coder.getValueCoder();
        WindowedValue.FullWindowedValueCoder wvCoder = WindowedValue.FullWindowedValueCoder.of((Coder)valueCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        JavaRDD groupedByKey = rdd.map(new ReifyTimestampsAndWindowsFunction()).map(WindowingHelpers.unwindowFunction()).mapToPair(TranslationUtils.toPairFunction()).mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder)).groupByKey().mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, wvCoder)).map(TranslationUtils.fromPairFunction()).map(WindowingHelpers.windowFunction());
        WindowFn windowFn = windowingStrategy.getWindowFn();
        GroupAlsoByWindowsViaOutputBufferDoFn gabwDoFn = new GroupAlsoByWindowsViaOutputBufferDoFn(windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory(), SystemReduceFn.buffering((Coder)valueCoder));
        return groupedByKey.mapPartitions(new DoFnFunction(accum, gabwDoFn, runtimeContext, null, (WindowFn<Object, ?>)windowFn));
    }

    public static <InputT, AccumT, OutputT> JavaRDD<WindowedValue<OutputT>> combineGlobally(JavaRDD<WindowedValue<InputT>> rdd, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn, Coder<InputT> iCoder, Coder<OutputT> oCoder, SparkRuntimeContext runtimeContext, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, boolean hasDefault) {
        Coder aCoder;
        if (rdd.isEmpty()) {
            JavaSparkContext jsc = new JavaSparkContext(rdd.context());
            if (hasDefault) {
                Object defaultValue = combineFn.defaultValue();
                return jsc.parallelize((List)Lists.newArrayList((Object[])new byte[][]{CoderHelpers.toByteArray(defaultValue, oCoder)})).map(CoderHelpers.fromByteFunction(oCoder)).map(WindowingHelpers.windowFunction());
            }
            return jsc.emptyRDD();
        }
        try {
            aCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), iCoder);
        }
        catch (CannotProvideCoderException e) {
            throw new IllegalStateException("Could not determine coder for accumulator", e);
        }
        final WindowedValue.FullWindowedValueCoder wviCoder = WindowedValue.FullWindowedValueCoder.of(iCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder wvaCoder = WindowedValue.FullWindowedValueCoder.of((Coder)aCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder wvoCoder = WindowedValue.FullWindowedValueCoder.of(oCoder, (Coder)windowingStrategy.getWindowFn().windowCoder());
        final SparkGlobalCombineFn<InputT, AccumT, OutputT> sparkCombineFn = new SparkGlobalCombineFn<InputT, AccumT, OutputT>(combineFn, runtimeContext, sideInputs, windowingStrategy);
        final IterableCoder iterAccumCoder = IterableCoder.of((Coder)wvaCoder);
        JavaRDD inRddBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
        byte[] acc = (byte[])inRddBytes.aggregate((Object)CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder), (Function2)new Function2<byte[], byte[], byte[]>(){

            public byte[] call(byte[] ab, byte[] ib) throws Exception {
                Iterable a = (Iterable)CoderHelpers.fromByteArray(ab, iterAccumCoder);
                WindowedValue i = (WindowedValue)CoderHelpers.fromByteArray(ib, wviCoder);
                return CoderHelpers.toByteArray(sparkCombineFn.seqOp(a, i), iterAccumCoder);
            }
        }, (Function2)new Function2<byte[], byte[], byte[]>(){

            public byte[] call(byte[] a1b, byte[] a2b) throws Exception {
                Iterable a1 = (Iterable)CoderHelpers.fromByteArray(a1b, iterAccumCoder);
                Iterable a2 = (Iterable)CoderHelpers.fromByteArray(a2b, iterAccumCoder);
                Iterable merged = sparkCombineFn.combOp(a1, a2);
                return CoderHelpers.toByteArray(merged, iterAccumCoder);
            }
        });
        Iterable<WindowedValue<OutputT>> output = sparkCombineFn.extractOutput((Iterable)CoderHelpers.fromByteArray(acc, iterAccumCoder));
        return new JavaSparkContext(rdd.context()).parallelize(CoderHelpers.toByteArrays(output, wvoCoder)).map(CoderHelpers.fromByteFunction(wvoCoder));
    }

    public static <K, InputT, AccumT, OutputT> JavaRDD<WindowedValue<KV<K, OutputT>>> combinePerKey(JavaRDD<WindowedValue<KV<K, InputT>>> rdd, CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn, KvCoder<K, InputT> inputCoder, SparkRuntimeContext runtimeContext, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs) {
        Coder vaCoder;
        Coder keyCoder = inputCoder.getKeyCoder();
        Coder viCoder = inputCoder.getValueCoder();
        try {
            vaCoder = combineFn.getAccumulatorCoder(runtimeContext.getCoderRegistry(), keyCoder, viCoder);
        }
        catch (CannotProvideCoderException e) {
            throw new IllegalStateException("Could not determine coder for accumulator", e);
        }
        final WindowedValue.FullWindowedValueCoder wkviCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of((Coder)keyCoder, (Coder)viCoder), (Coder)windowingStrategy.getWindowFn().windowCoder());
        WindowedValue.FullWindowedValueCoder wkvaCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of((Coder)keyCoder, (Coder)vaCoder), (Coder)windowingStrategy.getWindowFn().windowCoder());
        JavaPairRDD inRddDuplicatedKeyPair = rdd.flatMapToPair(new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, K, WindowedValue<KV<K, InputT>>>(){

            public Iterable<Tuple2<K, WindowedValue<KV<K, InputT>>>> call(WindowedValue<KV<K, InputT>> wkv) {
                return Collections.singletonList(new Tuple2(((KV)wkv.getValue()).getKey(), wkv));
            }
        });
        final SparkKeyedCombineFn<K, InputT, AccumT, OutputT> sparkCombineFn = new SparkKeyedCombineFn<K, InputT, AccumT, OutputT>(combineFn, runtimeContext, sideInputs, windowingStrategy);
        final IterableCoder iterAccumCoder = IterableCoder.of((Coder)wkvaCoder);
        JavaPairRDD inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair.mapToPair(CoderHelpers.toByteFunction(keyCoder, wkviCoder));
        JavaPairRDD accumulatedBytes = inRddDuplicatedKeyPairBytes.combineByKey((Function)new Function<byte[], byte[]>(){

            public byte[] call(byte[] input) {
                WindowedValue wkvi = (WindowedValue)CoderHelpers.fromByteArray(input, wkviCoder);
                return CoderHelpers.toByteArray(sparkCombineFn.createCombiner(wkvi), iterAccumCoder);
            }
        }, (Function2)new Function2<byte[], byte[], byte[]>(){

            public byte[] call(byte[] acc, byte[] input) {
                Iterable wkvas = (Iterable)CoderHelpers.fromByteArray(acc, iterAccumCoder);
                WindowedValue wkvi = (WindowedValue)CoderHelpers.fromByteArray(input, wkviCoder);
                return CoderHelpers.toByteArray(sparkCombineFn.mergeValue(wkvi, wkvas), iterAccumCoder);
            }
        }, (Function2)new Function2<byte[], byte[], byte[]>(){

            public byte[] call(byte[] acc1, byte[] acc2) {
                Iterable wkvas1 = (Iterable)CoderHelpers.fromByteArray(acc1, iterAccumCoder);
                Iterable wkvas2 = (Iterable)CoderHelpers.fromByteArray(acc2, iterAccumCoder);
                return CoderHelpers.toByteArray(sparkCombineFn.mergeCombiners(wkvas1, wkvas2), iterAccumCoder);
            }
        });
        JavaPairRDD extracted = accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder)).flatMapValues(new Function<Iterable<WindowedValue<KV<K, AccumT>>>, Iterable<WindowedValue<OutputT>>>(){

            public Iterable<WindowedValue<OutputT>> call(Iterable<WindowedValue<KV<K, AccumT>>> accums) {
                return sparkCombineFn.extractOutput(accums);
            }
        });
        return extracted.map(TranslationUtils.fromPairFunction()).map(new Function<KV<K, WindowedValue<OutputT>>, WindowedValue<KV<K, OutputT>>>(){

            public WindowedValue<KV<K, OutputT>> call(KV<K, WindowedValue<OutputT>> kwvo) throws Exception {
                return ((WindowedValue)kwvo.getValue()).withValue((Object)KV.of((Object)kwvo.getKey(), (Object)((WindowedValue)kwvo.getValue()).getValue()));
            }
        });
    }
}

