/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.concurrent.Executors;
import org.apache.beam.runners.core.ElementAndRestriction;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.DoFnLifecycleManager;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.runners.direct.ParDoEvaluatorFactory;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;

class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
implements TransformEvaluatorFactory {
    private final ParDoEvaluatorFactory<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> delegateFactory;
    private final EvaluationContext evaluationContext;

    SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
        this.delegateFactory = new ParDoEvaluatorFactory(evaluationContext);
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) throws Exception {
        TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<?, RestrictionT>>> evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> application, DirectRunner.CommittedBundle<InputT> inputBundle) throws Exception {
        final SplittableParDo.ProcessElements transform = (SplittableParDo.ProcessElements)application.getTransform();
        SplittableParDo.ProcessFn processFn = transform.newProcessFn(transform.getFn());
        DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
        processFn = (SplittableParDo.ProcessFn)fnManager.get();
        String stepName = this.evaluationContext.getStepName(application);
        final DirectExecutionContext.DirectStepContext stepContext = (DirectExecutionContext.DirectStepContext)this.evaluationContext.getExecutionContext(application, inputBundle.getKey()).getOrCreateStepContext(stepName, stepName);
        ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> parDoEvaluator = this.delegateFactory.createParDoEvaluator((AppliedPTransform<PCollection<KeyedWorkItem<String, ElementAndRestriction<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, RestrictionT>>>, PCollectionTuple, ?>)application, inputBundle.getKey(), transform.getSideInputs(), (TupleTag<OutputT>)transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), stepContext, (DoFn<KeyedWorkItem<String, ElementAndRestriction<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, RestrictionT>>, OutputT>)processFn, fnManager);
        processFn.setStateInternalsFactory((StateInternalsFactory)new StateInternalsFactory<String>(){

            public StateInternals<String> stateInternalsForKey(String key) {
                return stepContext.stateInternals();
            }
        });
        processFn.setTimerInternalsFactory((TimerInternalsFactory)new TimerInternalsFactory<String>(){

            public TimerInternals timerInternalsForKey(String key) {
                return stepContext.timerInternals();
            }
        });
        final ParDoEvaluator.BundleOutputManager outputManager = parDoEvaluator.getOutputManager();
        processFn.setProcessElementInvoker((SplittableProcessElementInvoker)new OutputAndTimeBoundedSplittableProcessElementInvoker(transform.getFn(), (PipelineOptions)this.evaluationContext.getPipelineOptions(), new OutputWindowedValue<OutputT>(){

            public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                outputManager.output(transform.getMainOutputTag(), WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
            }

            public <SideOutputT> void sideOutputWindowedValue(TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                outputManager.output(tag, WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
            }
        }, (SideInputReader)this.evaluationContext.createSideInputReader(transform.getSideInputs()), Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), 10000, Duration.standardSeconds((long)10L)));
        return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager);
    }
}

