package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectGroupByKey;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Predicate;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.FluentIterable;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.class */
public class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory$GroupAlsoByWindowEvaluator.class */
    public static class GroupAlsoByWindowEvaluator<K, V> implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final EvaluationContext evaluationContext;
        private final AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> application;
        private final DirectExecutionContext.DirectStepContext stepContext;
        private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
        private final StructuralKey<?> structuralKey;
        private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
        private final Collection<UncommittedBundle<?>> outputBundles = new ArrayList();
        private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements = ImmutableList.builder();
        private final Counter droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class, "DroppedDueToClosedWindow");
        private final Counter droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, "DroppedDueToLateness");

        public GroupAlsoByWindowEvaluator(EvaluationContext evaluationContext, CommittedBundle<KeyedWorkItem<K, V>> committedBundle, AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> appliedPTransform) {
            this.evaluationContext = evaluationContext;
            this.application = appliedPTransform;
            this.structuralKey = committedBundle.getKey();
            this.stepContext = evaluationContext.getExecutionContext(appliedPTransform, committedBundle.getKey()).getOrCreateStepContext(evaluationContext.getStepName(appliedPTransform), ((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getName());
            this.windowingStrategy = ((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getInputWindowingStrategy();
            this.reduceFn = SystemReduceFn.buffering(((DirectGroupByKey.DirectGroupAlsoByWindow) appliedPTransform.getTransform()).getValueCoder(committedBundle.getPCollection().getCoder()));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> windowedValue) throws Exception {
            KeyedWorkItem keyedWorkItem = (KeyedWorkItem) windowedValue.getValue();
            Object key = keyedWorkItem.key();
            UncommittedBundle<?> createKeyedBundle = this.evaluationContext.createKeyedBundle(this.structuralKey, (PCollection) Iterables.getOnlyElement(this.application.getOutputs().values()));
            this.outputBundles.add(createKeyedBundle);
            CopyOnAccessInMemoryStateInternals<?> m4stateInternals = this.stepContext.m4stateInternals();
            DirectTimerInternals m3timerInternals = this.stepContext.m3timerInternals();
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(this.windowingStrategy.getTrigger()))), m4stateInternals, m3timerInternals, new OutputWindowedValueToBundle(createKeyedBundle), new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, this.evaluationContext.getPipelineOptions());
            reduceFnRunner.processElements(dropExpiredWindows(key, keyedWorkItem.elementsIterable(), m3timerInternals));
            reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals commitState = this.stepContext.commitState();
            return StepTransformResult.withHold(this.application, commitState.getEarliestWatermarkHold()).withState(commitState).addOutput(this.outputBundles).withTimerUpdate(this.stepContext.getTimerUpdate()).addUnprocessedElements(this.unprocessedElements.build()).build();
        }

        public Iterable<WindowedValue<V>> dropExpiredWindows(final K k, Iterable<WindowedValue<V>> iterable, final TimerInternals timerInternals) {
            return FluentIterable.from(iterable).transformAndConcat(new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() { // from class: org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory.GroupAlsoByWindowEvaluator.2
                @Override // org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Function
                public Iterable<WindowedValue<V>> apply(WindowedValue<V> windowedValue) {
                    return windowedValue.explodeWindows();
                }
            }).filter(new Predicate<WindowedValue<V>>() { // from class: org.apache.beam.runners.direct.GroupAlsoByWindowEvaluatorFactory.GroupAlsoByWindowEvaluator.1
                @Override // org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Predicate
                public boolean apply(WindowedValue<V> windowedValue) {
                    BoundedWindow boundedWindow = (BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows());
                    boolean isBefore = boundedWindow.maxTimestamp().plus(GroupAlsoByWindowEvaluator.this.windowingStrategy.getAllowedLateness()).isBefore(timerInternals.currentInputWatermarkTime());
                    if (isBefore) {
                        GroupAlsoByWindowEvaluator.this.droppedDueToLateness.inc();
                        WindowTracing.debug("GroupAlsoByWindow: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", new Object[]{windowedValue.getTimestamp(), k, boundedWindow, timerInternals.currentInputWatermarkTime()});
                    }
                    return !isBefore;
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory$OutputWindowedValueToBundle.class */
    private static class OutputWindowedValueToBundle<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final UncommittedBundle<KV<K, Iterable<V>>> bundle;

        private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> uncommittedBundle) {
            this.bundle = uncommittedBundle;
        }

        public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.bundle.add(WindowedValue.of(kv, instant, collection, paneInfo));
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs");
        }

        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupByKey.DirectGroupAlsoByWindow<K, V>> appliedPTransform, CommittedBundle<KeyedWorkItem<K, V>> committedBundle) {
        return new GroupAlsoByWindowEvaluator(this.evaluationContext, committedBundle, appliedPTransform);
    }
}
