/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluatorFactory;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.StructuralKey;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;

final class StatefulParDoEvaluatorFactory<K, InputT, OutputT>
implements TransformEvaluatorFactory {
    private final LoadingCache<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> cleanupRegistry;
    private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory;

    StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
        this.delegateFactory = new ParDoEvaluatorFactory(evaluationContext);
        this.cleanupRegistry = CacheBuilder.newBuilder().weakValues().build(new CleanupSchedulingLoader(evaluationContext));
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) throws Exception {
        TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> createEvaluator(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> application, DirectRunner.CommittedBundle<KeyedWorkItem<K, KV<K, InputT>>> inputBundle) throws Exception {
        DoFn doFn = ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getUnderlyingParDo().getFn();
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        if (signature.stateDeclarations().size() > 0) {
            for (WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> element : inputBundle.getElements()) {
                for (BoundedWindow window : element.getWindows()) {
                    this.cleanupRegistry.get(AppliedPTransformOutputKeyAndWindow.create(application, inputBundle.getKey(), window));
                }
            }
        }
        DoFnLifecycleManagerRemovingTransformEvaluator<? extends KeyedWorkItem<K, KV<K, InputT>>> delegateEvaluator = this.delegateFactory.createEvaluator(application, inputBundle.getKey(), doFn, ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getUnderlyingParDo().getSideInputs(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getUnderlyingParDo().getMainOutputTag(), ((ParDoMultiOverrideFactory.StatefulParDo)application.getTransform()).getUnderlyingParDo().getSideOutputTags().getAll());
        return new StatefulParDoEvaluator(delegateEvaluator);
    }

    private static class StatefulParDoEvaluator<K, InputT>
    implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
        private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;

        public StatefulParDoEvaluator(DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
            this.delegateEvaluator = delegateEvaluator;
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult) throws Exception {
            BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement(gbkResult.getWindows());
            for (WindowedValue windowedValue : ((KeyedWorkItem)gbkResult.getValue()).elementsIterable()) {
                this.delegateEvaluator.processElement(windowedValue);
            }
            for (TimerInternals.TimerData timer : ((KeyedWorkItem)gbkResult.getValue()).timersIterable()) {
                this.delegateEvaluator.onTimer(timer, window);
            }
        }

        @Override
        public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
            TransformResult<KV<K, InputT>> delegateResult = this.delegateEvaluator.finishBundle();
            StepTransformResult.Builder regroupedResult = StepTransformResult.withHold(delegateResult.getTransform(), delegateResult.getWatermarkHold()).withTimerUpdate(delegateResult.getTimerUpdate()).withState(delegateResult.getState()).withAggregatorChanges(delegateResult.getAggregatorChanges()).withMetricUpdates(delegateResult.getLogicalMetricUpdates()).addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
            Iterator<WindowedValue<KV<K, InputT>>> iterator = delegateResult.getUnprocessedElements().iterator();
            while (iterator.hasNext()) {
                WindowedValue<KV<K, InputT>> untypedUnprocessed;
                WindowedValue<KV<K, InputT>> windowedKv = untypedUnprocessed = iterator.next();
                WindowedValue pushedBack = windowedKv.withValue((Object)KeyedWorkItems.elementsWorkItem((Object)((KV)windowedKv.getValue()).getKey(), Collections.singleton(windowedKv)));
                regroupedResult.addUnprocessedElements(pushedBack);
            }
            return regroupedResult.build();
        }
    }

    static abstract class AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> {
        AppliedPTransformOutputKeyAndWindow() {
        }

        abstract AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> getTransform();

        abstract StructuralKey<K> getKey();

        abstract BoundedWindow getWindow();

        static <K, InputT, OutputT> AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> create(AppliedPTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, ParDoMultiOverrideFactory.StatefulParDo<K, InputT, OutputT>> transform, StructuralKey<K> key, BoundedWindow w) {
            return new AutoValue_StatefulParDoEvaluatorFactory_AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>(transform, key, w);
        }
    }

    private class CleanupSchedulingLoader
    extends CacheLoader<AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT>, Runnable> {
        private final EvaluationContext evaluationContext;

        public CleanupSchedulingLoader(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override
        public Runnable load(final AppliedPTransformOutputKeyAndWindow<K, InputT, OutputT> transformOutputWindow) {
            String stepName = this.evaluationContext.getStepName(transformOutputWindow.getTransform());
            HashMap<TupleTag, PCollection> taggedValues = new HashMap<TupleTag, PCollection>();
            for (TaggedPValue pv : transformOutputWindow.getTransform().getOutputs()) {
                taggedValues.put(pv.getTag(), (PCollection)pv.getValue());
            }
            PCollection pc = (PCollection)taggedValues.get(((ParDoMultiOverrideFactory.StatefulParDo)transformOutputWindow.getTransform().getTransform()).getUnderlyingParDo().getMainOutputTag());
            WindowingStrategy windowingStrategy = pc.getWindowingStrategy();
            BoundedWindow window = transformOutputWindow.getWindow();
            final DoFn doFn = ((ParDoMultiOverrideFactory.StatefulParDo)transformOutputWindow.getTransform().getTransform()).getUnderlyingParDo().getFn();
            final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
            final DirectExecutionContext.DirectStepContext stepContext = (DirectExecutionContext.DirectStepContext)this.evaluationContext.getExecutionContext(transformOutputWindow.getTransform(), transformOutputWindow.getKey()).getOrCreateStepContext(stepName, stepName);
            final StateNamespace namespace = StateNamespaces.window((Coder)windowingStrategy.getWindowFn().windowCoder(), (BoundedWindow)window);
            Runnable cleanup = new Runnable(){

                @Override
                public void run() {
                    for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
                        StateTag tag;
                        try {
                            tag = StateTags.tagForSpec((String)stateDecl.id(), (StateSpec)((StateSpec)stateDecl.field().get(doFn)));
                        }
                        catch (IllegalAccessException e) {
                            throw new RuntimeException(String.format("Error accessing %s for %s", StateSpec.class.getName(), doFn.getClass().getName()), e);
                        }
                        stepContext.stateInternals().state(namespace, tag).clear();
                    }
                    StatefulParDoEvaluatorFactory.this.cleanupRegistry.invalidate(transformOutputWindow);
                }
            };
            this.evaluationContext.scheduleAfterWindowExpiration(transformOutputWindow.getTransform(), window, windowingStrategy, cleanup);
            return cleanup;
        }
    }
}

