/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFn;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.AbstractIterator;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Table;
import org.apache.beam.runners.spark.stateful.SparkStateInternals;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.PairDStreamFunctions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.runtime.AbstractFunction1;

public class SparkGroupAlsoByWindowViaWindowSet {
    private static final Logger LOG = LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

    public static <K, InputT, W extends BoundedWindow> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, final Coder<K> keyCoder, final Coder<WindowedValue<InputT>> wvCoder, final WindowingStrategy<?, W> windowingStrategy, final SparkRuntimeContext runtimeContext, final List<Integer> sourceIds) {
        final IterableCoder itrWvCoder = IterableCoder.of(wvCoder);
        Coder iCoder = ((WindowedValue.FullWindowedValueCoder)wvCoder).getValueCoder();
        Coder wCoder = ((WindowedValue.FullWindowedValueCoder)wvCoder).getWindowCoder();
        WindowedValue.FullWindowedValueCoder wvKvIterCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of(keyCoder, (Coder)IterableCoder.of((Coder)iCoder)), (Coder)wCoder);
        final TimerInternals.TimerDataCoder timerDataCoder = TimerInternals.TimerDataCoder.of((Coder)windowingStrategy.getWindowFn().windowCoder());
        long checkpointDurationMillis = ((SparkPipelineOptions)runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class)).getCheckpointDurationMillis();
        DStream pairDStream = inputDStream.transformToPair(new Function<JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>, JavaPairRDD<ByteArray, byte[]>>(){

            public JavaPairRDD<ByteArray, byte[]> call(JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd) throws Exception {
                return rdd.mapPartitions(TranslationUtils.functionToFlatMapFunction(WindowingHelpers.unwindowFunction()), true).mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true).mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.toByteFunction(keyCoder, itrWvCoder)), true);
            }
        }).dstream();
        PairDStreamFunctions pairDStreamFunctions = DStream.toPairDStreamFunctions((DStream)pairDStream, (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), null);
        int defaultNumPartitions = pairDStreamFunctions.defaultPartitioner$default$1();
        HashPartitioner partitioner = pairDStreamFunctions.defaultPartitioner(defaultNumPartitions);
        DStream firedStream = pairDStreamFunctions.updateStateByKey((Function1)new SerializableFunction1<Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>>, Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>>((Coder)wvKvIterCoder){
            final /* synthetic */ Coder val$wvKvIterCoder;
            {
                this.val$wvKvIterCoder = coder3;
            }

            public Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> apply(final Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> iter) {
                long closedWindowDropped;
                final SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)((WindowedValue.FullWindowedValueCoder)wvCoder).getValueCoder());
                final OutputWindowedValueHolder outputHolder = new OutputWindowedValueHolder();
                MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider");
                CounterCell droppedDueToClosedWindow = cellProvider.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, (String)"DroppedDueToClosedWindow"));
                final CounterCell droppedDueToLateness = cellProvider.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, (String)"DroppedDueToLateness"));
                AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> outIter = new AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>(){

                    @Override
                    protected Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> computeNext() {
                        while (iter.hasNext()) {
                            SparkStateInternals stateInternals;
                            Tuple3 next = (Tuple3)iter.next();
                            ByteArray encodedKey = (ByteArray)next._1();
                            Object key = CoderHelpers.fromByteArray(encodedKey.getValue(), keyCoder);
                            Seq seq = (Seq)next._2();
                            Option prevStateAndTimersOpt = (Option)next._3();
                            SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(sourceIds, GlobalWatermarkHolder.get());
                            if (prevStateAndTimersOpt.isEmpty()) {
                                stateInternals = SparkStateInternals.forKey(key);
                            } else {
                                StateAndTimers prevStateAndTimers = (StateAndTimers)((Tuple2)prevStateAndTimersOpt.get())._1();
                                stateInternals = SparkStateInternals.forKeyAndState(key, prevStateAndTimers.getState());
                                Collection<byte[]> serTimers = prevStateAndTimers.getTimers();
                                timerInternals.addTimers(SparkTimerInternals.deserializeTimers(serTimers, timerDataCoder));
                            }
                            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, windowingStrategy, ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((RunnerApi.Trigger)TriggerTranslation.toProto((Trigger)windowingStrategy.getTrigger()))), stateInternals, (TimerInternals)timerInternals, (OutputWindowedValue)outputHolder, (SideInputReader)new UnsupportedSideInputReader("GroupAlsoByWindow"), (ReduceFn)reduceFn, runtimeContext.getPipelineOptions());
                            outputHolder.clear();
                            if (!seq.isEmpty()) {
                                try {
                                    Iterable elementsIterable = (Iterable)CoderHelpers.fromByteArray((byte[])seq.head(), itrWvCoder);
                                    Iterable validElements = LateDataUtils.dropExpiredWindows(key, (Iterable)elementsIterable, (TimerInternals)timerInternals, (WindowingStrategy)windowingStrategy, (CounterCell)droppedDueToLateness);
                                    reduceFnRunner.processElements(validElements);
                                }
                                catch (Exception e) {
                                    throw new RuntimeException("Failed to process element with ReduceFnRunner", e);
                                }
                            } else if (stateInternals.getState().isEmpty()) continue;
                            try {
                                timerInternals.advanceWatermark();
                                reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess());
                            }
                            catch (Exception e) {
                                throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e);
                            }
                            reduceFnRunner.persist();
                            List outputs = outputHolder.get();
                            if (outputs.isEmpty() && stateInternals.getState().isEmpty()) continue;
                            StateAndTimers updated = new StateAndTimers(stateInternals.getState(), SparkTimerInternals.serializeTimers(timerInternals.getTimers(), timerDataCoder));
                            List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, val$wvKvIterCoder);
                            return new Tuple2((Object)encodedKey, (Object)new Tuple2((Object)updated, serOutput));
                        }
                        return (Tuple2)this.endOfData();
                    }
                };
                long lateDropped = droppedDueToLateness.getCumulative();
                if (lateDropped > 0L) {
                    LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped));
                    droppedDueToLateness.inc(-droppedDueToLateness.getCumulative().longValue());
                }
                if ((closedWindowDropped = droppedDueToClosedWindow.getCumulative().longValue()) > 0L) {
                    LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped));
                    droppedDueToClosedWindow.inc(-droppedDueToClosedWindow.getCumulative().longValue());
                }
                return JavaConversions.asScalaIterator((java.util.Iterator)outIter);
            }
        }, (Partitioner)partitioner, true, JavaSparkContext$.MODULE$.fakeClassTag());
        if (checkpointDurationMillis > 0L) {
            firedStream.checkpoint(new Duration(checkpointDurationMillis));
        }
        JavaPairDStream javaFiredStream = JavaPairDStream.fromPairDStream((DStream)firedStream, (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag(), (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag());
        return javaFiredStream.filter((Function)new Function<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, Boolean>(){

            public Boolean call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> t2) throws Exception {
                return !((List)((Tuple2)t2._2())._2()).isEmpty();
            }
        }).flatMap(new FlatMapFunction<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, WindowedValue<KV<K, Iterable<InputT>>>>((Coder)wvKvIterCoder){
            final /* synthetic */ Coder val$wvKvIterCoder;
            {
                this.val$wvKvIterCoder = coder;
            }

            public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> t2) throws Exception {
                return CoderHelpers.fromByteArrays((Collection)((Tuple2)t2._2())._2(), this.val$wvKvIterCoder);
            }
        });
    }

    private static class OutputWindowedValueHolder<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<WindowedValue<KV<K, Iterable<V>>>>();

        private OutputWindowedValueHolder() {
        }

        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.windowedValues.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        private List<WindowedValue<KV<K, Iterable<V>>>> get() {
            return this.windowedValues;
        }

        private void clear() {
            this.windowedValues.clear();
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException("Tagged outputs are not allowed in GroupAlsoByWindow.");
        }
    }

    private static class StateAndTimers {
        private final Table<String, String, byte[]> state;
        private final Collection<byte[]> serTimers;

        private StateAndTimers(Table<String, String, byte[]> state, Collection<byte[]> timers) {
            this.state = state;
            this.serTimers = timers;
        }

        public Table<String, String, byte[]> getState() {
            return this.state;
        }

        public Collection<byte[]> getTimers() {
            return this.serTimers;
        }
    }

    private static abstract class SerializableFunction1<T1, T2>
    extends AbstractFunction1<T1, T2>
    implements Serializable {
        private SerializableFunction1() {
        }
    }
}

