package org.apache.beam.runners.spark.translation;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupCombineFunctions.class */
public class GroupCombineFunctions {
    public static <K, V, W extends BoundedWindow> JavaRDD<WindowedValue<KV<K, Iterable<V>>>> groupByKey(JavaRDD<WindowedValue<KV<K, V>>> javaRDD, Accumulator<NamedAggregators> accumulator, KvCoder<K, V> kvCoder, SparkRuntimeContext sparkRuntimeContext, WindowingStrategy<?, W> windowingStrategy) {
        Coder keyCoder = kvCoder.getKeyCoder();
        Coder valueCoder = kvCoder.getValueCoder();
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(valueCoder, windowingStrategy.getWindowFn().windowCoder());
        return javaRDD.map(new ReifyTimestampsAndWindowsFunction()).map(WindowingHelpers.unwindowFunction()).mapToPair(TranslationUtils.toPairFunction()).mapToPair(CoderHelpers.toByteFunction(keyCoder, of)).groupByKey().mapToPair(CoderHelpers.fromByteFunctionIterable(keyCoder, of)).map(TranslationUtils.fromPairFunction()).map(WindowingHelpers.windowFunction()).mapPartitions(new DoFnFunction(accumulator, new GroupAlsoByWindowsViaOutputBufferDoFn(windowingStrategy, new TranslationUtils.InMemoryStateInternalsFactory(), SystemReduceFn.buffering(valueCoder)), sparkRuntimeContext, null, windowingStrategy.getWindowFn()));
    }

    /* JADX WARN: Type inference failed for: r1v28, types: [java.lang.Object[], byte[]] */
    public static <InputT, AccumT, OutputT> JavaRDD<WindowedValue<OutputT>> combineGlobally(JavaRDD<WindowedValue<InputT>> javaRDD, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext, Coder<InputT> coder, Coder<OutputT> coder2, SparkRuntimeContext sparkRuntimeContext, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> map, boolean z) {
        if (javaRDD.isEmpty()) {
            JavaSparkContext javaSparkContext = new JavaSparkContext(javaRDD.context());
            return z ? javaSparkContext.parallelize(Lists.newArrayList((Object[]) new byte[]{CoderHelpers.toByteArray(combineFnWithContext.defaultValue(), coder2)})).map(CoderHelpers.fromByteFunction(coder2)).map(WindowingHelpers.windowFunction()) : javaSparkContext.emptyRDD();
        }
        try {
            Coder accumulatorCoder = combineFnWithContext.getAccumulatorCoder(sparkRuntimeContext.getCoderRegistry(), coder);
            final WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(coder, windowingStrategy.getWindowFn().windowCoder());
            WindowedValue.FullWindowedValueCoder of2 = WindowedValue.FullWindowedValueCoder.of(accumulatorCoder, windowingStrategy.getWindowFn().windowCoder());
            WindowedValue.FullWindowedValueCoder of3 = WindowedValue.FullWindowedValueCoder.of(coder2, windowingStrategy.getWindowFn().windowCoder());
            final SparkGlobalCombineFn sparkGlobalCombineFn = new SparkGlobalCombineFn(combineFnWithContext, sparkRuntimeContext, map, windowingStrategy);
            final IterableCoder of4 = IterableCoder.of(of2);
            return new JavaSparkContext(javaRDD.context()).parallelize(CoderHelpers.toByteArrays(sparkGlobalCombineFn.extractOutput((Iterable) CoderHelpers.fromByteArray((byte[]) javaRDD.map(CoderHelpers.toByteFunction(of)).aggregate(CoderHelpers.toByteArray(sparkGlobalCombineFn.zeroValue(), of4), new Function2<byte[], byte[], byte[]>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.1
                public byte[] call(byte[] bArr, byte[] bArr2) throws Exception {
                    return CoderHelpers.toByteArray(sparkGlobalCombineFn.seqOp((Iterable) CoderHelpers.fromByteArray(bArr, of4), (WindowedValue) CoderHelpers.fromByteArray(bArr2, of)), of4);
                }
            }, new Function2<byte[], byte[], byte[]>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.2
                public byte[] call(byte[] bArr, byte[] bArr2) throws Exception {
                    return CoderHelpers.toByteArray(sparkGlobalCombineFn.combOp((Iterable) CoderHelpers.fromByteArray(bArr, of4), (Iterable) CoderHelpers.fromByteArray(bArr2, of4)), of4);
                }
            }), of4)), of3)).map(CoderHelpers.fromByteFunction(of3));
        } catch (CannotProvideCoderException e) {
            throw new IllegalStateException("Could not determine coder for accumulator", e);
        }
    }

    public static <K, InputT, AccumT, OutputT> JavaRDD<WindowedValue<KV<K, OutputT>>> combinePerKey(JavaRDD<WindowedValue<KV<K, InputT>>> javaRDD, CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext, KvCoder<K, InputT> kvCoder, SparkRuntimeContext sparkRuntimeContext, WindowingStrategy<?, ?> windowingStrategy, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> map) {
        Coder keyCoder = kvCoder.getKeyCoder();
        Coder valueCoder = kvCoder.getValueCoder();
        try {
            Coder accumulatorCoder = keyedCombineFnWithContext.getAccumulatorCoder(sparkRuntimeContext.getCoderRegistry(), keyCoder, valueCoder);
            final WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(KvCoder.of(keyCoder, valueCoder), windowingStrategy.getWindowFn().windowCoder());
            WindowedValue.FullWindowedValueCoder of2 = WindowedValue.FullWindowedValueCoder.of(KvCoder.of(keyCoder, accumulatorCoder), windowingStrategy.getWindowFn().windowCoder());
            JavaPairRDD flatMapToPair = javaRDD.flatMapToPair(new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, K, WindowedValue<KV<K, InputT>>>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.3
                public Iterable<Tuple2<K, WindowedValue<KV<K, InputT>>>> call(WindowedValue<KV<K, InputT>> windowedValue) {
                    return Collections.singletonList(new Tuple2(((KV) windowedValue.getValue()).getKey(), windowedValue));
                }
            });
            final SparkKeyedCombineFn sparkKeyedCombineFn = new SparkKeyedCombineFn(keyedCombineFnWithContext, sparkRuntimeContext, map, windowingStrategy);
            final IterableCoder of3 = IterableCoder.of(of2);
            return flatMapToPair.mapToPair(CoderHelpers.toByteFunction(keyCoder, of)).combineByKey(new Function<byte[], byte[]>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.4
                public byte[] call(byte[] bArr) {
                    return CoderHelpers.toByteArray(sparkKeyedCombineFn.createCombiner((WindowedValue) CoderHelpers.fromByteArray(bArr, of)), of3);
                }
            }, new Function2<byte[], byte[], byte[]>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.5
                public byte[] call(byte[] bArr, byte[] bArr2) {
                    Iterable iterable = (Iterable) CoderHelpers.fromByteArray(bArr, of3);
                    return CoderHelpers.toByteArray(sparkKeyedCombineFn.mergeValue((WindowedValue) CoderHelpers.fromByteArray(bArr2, of), iterable), of3);
                }
            }, new Function2<byte[], byte[], byte[]>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.6
                public byte[] call(byte[] bArr, byte[] bArr2) {
                    return CoderHelpers.toByteArray(sparkKeyedCombineFn.mergeCombiners((Iterable) CoderHelpers.fromByteArray(bArr, of3), (Iterable) CoderHelpers.fromByteArray(bArr2, of3)), of3);
                }
            }).mapToPair(CoderHelpers.fromByteFunction(keyCoder, of3)).flatMapValues(new Function<Iterable<WindowedValue<KV<K, AccumT>>>, Iterable<WindowedValue<OutputT>>>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.7
                public Iterable<WindowedValue<OutputT>> call(Iterable<WindowedValue<KV<K, AccumT>>> iterable) {
                    return SparkKeyedCombineFn.this.extractOutput(iterable);
                }
            }).map(TranslationUtils.fromPairFunction()).map(new Function<KV<K, WindowedValue<OutputT>>, WindowedValue<KV<K, OutputT>>>() { // from class: org.apache.beam.runners.spark.translation.GroupCombineFunctions.8
                public WindowedValue<KV<K, OutputT>> call(KV<K, WindowedValue<OutputT>> kv) throws Exception {
                    return ((WindowedValue) kv.getValue()).withValue(KV.of(kv.getKey(), ((WindowedValue) kv.getValue()).getValue()));
                }
            });
        } catch (CannotProvideCoderException e) {
            throw new IllegalStateException("Could not determine coder for accumulator", e);
        }
    }
}
