/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.PassthroughTransformEvaluator;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;

class WindowEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    WindowEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, @Nullable DirectRunner.CommittedBundle<?> inputBundle) throws Exception {
        return this.createTransformEvaluator(application);
    }

    private <InputT> TransformEvaluator<InputT> createTransformEvaluator(AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>> transform) {
        WindowFn fn = ((Window.Assign)transform.getTransform()).getWindowFn();
        DirectRunner.UncommittedBundle outputBundle = this.evaluationContext.createBundle((PCollection)((TaggedPValue)Iterables.getOnlyElement(transform.getOutputs())).getValue());
        if (fn == null) {
            return PassthroughTransformEvaluator.create(transform, outputBundle);
        }
        return new WindowIntoEvaluator<InputT>(transform, fn, outputBundle);
    }

    @Override
    public void cleanup() {
    }

    private static class DirectAssignContext<InputT, W extends BoundedWindow>
    extends WindowFn.AssignContext {
        private final WindowedValue<InputT> value;

        public DirectAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
            super(fn);
            this.value = value;
        }

        public InputT element() {
            return (InputT)this.value.getValue();
        }

        public Instant timestamp() {
            return this.value.getTimestamp();
        }

        public BoundedWindow window() {
            return (BoundedWindow)Iterables.getOnlyElement(this.value.getWindows());
        }
    }

    private static class WindowIntoEvaluator<InputT>
    implements TransformEvaluator<InputT> {
        private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>> transform;
        private final WindowFn<InputT, ?> windowFn;
        private final DirectRunner.UncommittedBundle<InputT> outputBundle;

        public WindowIntoEvaluator(AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>> transform, WindowFn<? super InputT, ?> windowFn, DirectRunner.UncommittedBundle<InputT> outputBundle) {
            this.outputBundle = outputBundle;
            this.transform = transform;
            this.windowFn = windowFn;
        }

        @Override
        public void processElement(WindowedValue<InputT> compressedElement) throws Exception {
            for (WindowedValue element : compressedElement.explodeWindows()) {
                Collection<BoundedWindow> windows = this.assignWindows(this.windowFn, element);
                this.outputBundle.add(WindowedValue.of((Object)element.getValue(), (Instant)element.getTimestamp(), windows, (PaneInfo)element.getPane()));
            }
        }

        private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
            DirectAssignContext<InputT, W> assignContext = new DirectAssignContext<InputT, W>(windowFn, element);
            Collection windows = windowFn.assignWindows(assignContext);
            return windows;
        }

        @Override
        public TransformResult<InputT> finishBundle() throws Exception {
            return StepTransformResult.withoutHold(this.transform).addOutput(this.outputBundle, new DirectRunner.UncommittedBundle[0]).build();
        }
    }
}

