package org.apache.beam.runners.spark.stateful;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.AbstractIterator;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Table;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.PairDStreamFunctions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.math.Ordering;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.class */
public class SparkGroupAlsoByWindowViaWindowSet {
    private static final Logger LOG = LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$OutputWindowedValueHolder.class */
    public static class OutputWindowedValueHolder<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private List<WindowedValue<KV<K, Iterable<V>>>> windowedValues;

        private OutputWindowedValueHolder() {
            this.windowedValues = new ArrayList();
        }

        public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.windowedValues.add(WindowedValue.of(kv, instant, collection, paneInfo));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<WindowedValue<KV<K, Iterable<V>>>> get() {
            return this.windowedValues;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void clear() {
            this.windowedValues.clear();
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException("Tagged outputs are not allowed in GroupAlsoByWindow.");
        }

        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$SerializableFunction1.class */
    private static abstract class SerializableFunction1<T1, T2> extends AbstractFunction1<T1, T2> implements Serializable {
        private SerializableFunction1() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$StateAndTimers.class */
    public static class StateAndTimers {
        private final Table<String, String, byte[]> state;
        private final Collection<byte[]> serTimers;

        private StateAndTimers(Table<String, String, byte[]> table, Collection<byte[]> collection) {
            this.state = table;
            this.serTimers = collection;
        }

        public Table<String, String, byte[]> getState() {
            return this.state;
        }

        public Collection<byte[]> getTimers() {
            return this.serTimers;
        }
    }

    public static <K, InputT, W extends BoundedWindow> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> javaDStream, final Coder<K> coder, final Coder<WindowedValue<InputT>> coder2, final WindowingStrategy<?, W> windowingStrategy, final SparkRuntimeContext sparkRuntimeContext, final List<Integer> list) {
        final long longValue = ((SparkPipelineOptions) sparkRuntimeContext.getPipelineOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis().longValue();
        final IterableCoder of = IterableCoder.of(coder2);
        Coder valueCoder = ((WindowedValue.FullWindowedValueCoder) coder2).getValueCoder();
        final WindowedValue.FullWindowedValueCoder of2 = WindowedValue.FullWindowedValueCoder.of(KvCoder.of(coder, IterableCoder.of(valueCoder)), ((WindowedValue.FullWindowedValueCoder) coder2).getWindowCoder());
        final TimerInternals.TimerDataCoder of3 = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
        long longValue2 = ((SparkPipelineOptions) sparkRuntimeContext.getPipelineOptions().as(SparkPipelineOptions.class)).getCheckpointDurationMillis().longValue();
        PairDStreamFunctions pairDStreamFunctions = DStream.toPairDStreamFunctions(javaDStream.transformToPair(new Function<JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>, JavaPairRDD<ByteArray, byte[]>>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.1
            public JavaPairRDD<ByteArray, byte[]> call(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> javaRDD) throws Exception {
                return javaRDD.mapPartitions(TranslationUtils.functionToFlatMapFunction(WindowingHelpers.unwindowFunction()), true).mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true).mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.toByteFunction(coder, of)), true);
            }
        }).dstream(), JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag(), (Ordering) null);
        DStream updateStateByKey = pairDStreamFunctions.updateStateByKey(new SerializableFunction1<Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>>, Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            public Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> apply(final Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> iterator) {
                final SystemReduceFn buffering = SystemReduceFn.buffering(coder2.getValueCoder());
                final OutputWindowedValueHolder outputWindowedValueHolder = new OutputWindowedValueHolder();
                MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("cellProvider");
                CounterCell counter = metricsContainerImpl.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, "DroppedDueToClosedWindow"));
                final CounterCell counter2 = metricsContainerImpl.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, "DroppedDueToLateness"));
                AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> abstractIterator = new AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.2.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.beam.runners.spark.repackaged.com.google.common.collect.AbstractIterator
                    public Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> computeNext() {
                        SparkStateInternals forKeyAndState;
                        while (iterator.hasNext()) {
                            Tuple3 tuple3 = (Tuple3) iterator.next();
                            ByteArray byteArray = (ByteArray) tuple3._1();
                            Object fromByteArray = CoderHelpers.fromByteArray(byteArray.getValue(), coder);
                            Seq seq = (Seq) tuple3._2();
                            Option option = (Option) tuple3._3();
                            SparkTimerInternals forStreamFromSources = SparkTimerInternals.forStreamFromSources(list, GlobalWatermarkHolder.get(Long.valueOf(longValue)));
                            if (option.isEmpty()) {
                                forKeyAndState = SparkStateInternals.forKey(fromByteArray);
                            } else {
                                StateAndTimers stateAndTimers = (StateAndTimers) ((Tuple2) option.get())._1();
                                forKeyAndState = SparkStateInternals.forKeyAndState(fromByteArray, stateAndTimers.getState());
                                forStreamFromSources.addTimers(SparkTimerInternals.deserializeTimers(stateAndTimers.getTimers(), of3));
                            }
                            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(fromByteArray, windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))), forKeyAndState, forStreamFromSources, outputWindowedValueHolder, new UnsupportedSideInputReader("GroupAlsoByWindow"), buffering, sparkRuntimeContext.getPipelineOptions());
                            outputWindowedValueHolder.clear();
                            try {
                                if (!seq.isEmpty()) {
                                    try {
                                        reduceFnRunner.processElements(LateDataUtils.dropExpiredWindows(fromByteArray, (Iterable) CoderHelpers.fromByteArray((byte[]) seq.head(), of), forStreamFromSources, windowingStrategy, counter2));
                                    } catch (Exception e) {
                                        throw new RuntimeException("Failed to process element with ReduceFnRunner", e);
                                    }
                                } else if (forKeyAndState.getState().isEmpty()) {
                                    continue;
                                }
                                forStreamFromSources.advanceWatermark();
                                reduceFnRunner.onTimers(forStreamFromSources.getTimersReadyToProcess());
                                reduceFnRunner.persist();
                                List list2 = outputWindowedValueHolder.get();
                                if (!list2.isEmpty() || !forKeyAndState.getState().isEmpty()) {
                                    return new Tuple2<>(byteArray, new Tuple2(new StateAndTimers(forKeyAndState.getState(), SparkTimerInternals.serializeTimers(forStreamFromSources.getTimers(), of3)), CoderHelpers.toByteArrays(list2, of2)));
                                }
                            } catch (Exception e2) {
                                throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e2);
                            }
                        }
                        return endOfData();
                    }
                };
                long longValue3 = counter2.getCumulative().longValue();
                if (longValue3 > 0) {
                    SparkGroupAlsoByWindowViaWindowSet.LOG.info(String.format("Dropped %d elements due to lateness.", Long.valueOf(longValue3)));
                    counter2.inc(-counter2.getCumulative().longValue());
                }
                long longValue4 = counter.getCumulative().longValue();
                if (longValue4 > 0) {
                    SparkGroupAlsoByWindowViaWindowSet.LOG.info(String.format("Dropped %d elements due to closed window.", Long.valueOf(longValue4)));
                    counter.inc(-counter.getCumulative().longValue());
                }
                return JavaConversions.asScalaIterator(abstractIterator);
            }
        }, pairDStreamFunctions.defaultPartitioner(pairDStreamFunctions.defaultPartitioner$default$1()), true, JavaSparkContext$.MODULE$.fakeClassTag());
        if (longValue2 > 0) {
            updateStateByKey.checkpoint(new Duration(longValue2));
        }
        return JavaPairDStream.fromPairDStream(updateStateByKey, JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag()).filter(new Function<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.4
            public Boolean call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> tuple2) throws Exception {
                return Boolean.valueOf(!((List) ((Tuple2) tuple2._2())._2()).isEmpty());
            }
        }).flatMap(new FlatMapFunction<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, WindowedValue<KV<K, Iterable<InputT>>>>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.3
            public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> tuple2) throws Exception {
                return CoderHelpers.fromByteArrays((Collection) ((Tuple2) tuple2._2())._2(), of2);
            }
        });
    }
}
