/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.DirectTimerInternals;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.StructuralKey;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Predicate;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.FluentIterable;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class GroupAlsoByWindowEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application, DirectRunner.CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
        return new GroupAlsoByWindowEvaluator<K, V>(this.evaluationContext, inputBundle, application);
    }

    private static class OutputWindowedValueToBundle<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final DirectRunner.UncommittedBundle<KV<K, Iterable<V>>> bundle;

        private OutputWindowedValueToBundle(DirectRunner.UncommittedBundle<KV<K, Iterable<V>>> bundle) {
            this.bundle = bundle;
        }

        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.bundle.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        public <SideOutputT> void sideOutputWindowedValue(TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs");
        }
    }

    private static class GroupAlsoByWindowEvaluator<K, V>
    implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final EvaluationContext evaluationContext;
        private final AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application;
        private final DirectExecutionContext.DirectStepContext stepContext;
        private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
        private final StructuralKey<?> structuralKey;
        private final Collection<DirectRunner.UncommittedBundle<?>> outputBundles;
        private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements;
        private final AggregatorContainer.Mutator aggregatorChanges;
        private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
        private final Aggregator<Long, Long> droppedDueToClosedWindow;
        private final Aggregator<Long, Long> droppedDueToLateness;

        public GroupAlsoByWindowEvaluator(EvaluationContext evaluationContext, DirectRunner.CommittedBundle<KeyedWorkItem<K, V>> inputBundle, AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.application = application;
            this.structuralKey = inputBundle.getKey();
            this.stepContext = (DirectExecutionContext.DirectStepContext)evaluationContext.getExecutionContext(application, inputBundle.getKey()).getOrCreateStepContext(evaluationContext.getStepName(application), ((DirectGroupByKey.DirectGroupAlsoByWindow)application.getTransform()).getName());
            this.windowingStrategy = ((DirectGroupByKey.DirectGroupAlsoByWindow)application.getTransform()).getInputWindowingStrategy();
            this.outputBundles = new ArrayList();
            this.unprocessedElements = ImmutableList.builder();
            this.aggregatorChanges = evaluationContext.getAggregatorMutator();
            Coder valueCoder = ((DirectGroupByKey.DirectGroupAlsoByWindow)application.getTransform()).getValueCoder(inputBundle.getPCollection().getCoder());
            this.reduceFn = SystemReduceFn.buffering(valueCoder);
            this.droppedDueToClosedWindow = this.aggregatorChanges.createSystemAggregator((ExecutionContext.StepContext)this.stepContext, "DroppedDueToClosedWindow", new Sum.SumLongFn());
            this.droppedDueToLateness = this.aggregatorChanges.createSystemAggregator((ExecutionContext.StepContext)this.stepContext, "DroppedDueToLateness", new Sum.SumLongFn());
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
            KeyedWorkItem workItem = (KeyedWorkItem)element.getValue();
            Object key = workItem.key();
            DirectRunner.UncommittedBundle bundle = this.evaluationContext.createKeyedBundle(this.structuralKey, (PCollection)this.application.getOutput());
            this.outputBundles.add(bundle);
            StateInternals stateInternals = this.stepContext.stateInternals();
            DirectTimerInternals timerInternals = this.stepContext.timerInternals();
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create((TriggerStateMachine)TriggerStateMachines.stateMachineForTrigger((Trigger)this.windowingStrategy.getTrigger())), stateInternals, (TimerInternals)timerInternals, new OutputWindowedValueToBundle(bundle), new SideInputReader(){

                public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) {
                    throw new UnsupportedOperationException("GroupAlsoByWindow must not have side inputs");
                }

                public <T> boolean contains(PCollectionView<T> view) {
                    throw new UnsupportedOperationException("GroupAlsoByWindow must not have side inputs");
                }

                public boolean isEmpty() {
                    throw new UnsupportedOperationException("GroupAlsoByWindow must not have side inputs");
                }
            }, this.droppedDueToClosedWindow, this.reduceFn, (PipelineOptions)this.evaluationContext.getPipelineOptions());
            reduceFnRunner.processElements(this.dropExpiredWindows(key, workItem.elementsIterable(), timerInternals));
            reduceFnRunner.onTimers(workItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override
        public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals<?> state = this.stepContext.commitState();
            return StepTransformResult.withHold(this.application, state.getEarliestWatermarkHold()).withState(state).addOutput(this.outputBundles).withTimerUpdate(this.stepContext.getTimerUpdate()).withAggregatorChanges(this.aggregatorChanges).addUnprocessedElements(this.unprocessedElements.build()).build();
        }

        public Iterable<WindowedValue<V>> dropExpiredWindows(final K key, Iterable<WindowedValue<V>> elements, final TimerInternals timerInternals) {
            return FluentIterable.from(elements).transformAndConcat(new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>(){

                @Override
                public Iterable<WindowedValue<V>> apply(WindowedValue<V> input) {
                    return input.explodeWindows();
                }
            }).filter(new Predicate<WindowedValue<V>>(){

                @Override
                public boolean apply(WindowedValue<V> input) {
                    BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(input.getWindows());
                    boolean expired = window.maxTimestamp().plus((ReadableDuration)GroupAlsoByWindowEvaluator.this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)timerInternals.currentInputWatermarkTime());
                    if (expired) {
                        GroupAlsoByWindowEvaluator.this.droppedDueToLateness.addValue((Object)1L);
                        WindowTracing.debug((String)"GroupAlsoByWindow: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", (Object[])new Object[]{input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime()});
                    }
                    return !expired;
                }
            });
        }
    }
}

