package org.apache.beam.runners.direct;

import java.util.Collection;
import org.apache.beam.runners.core.ElementAndRestriction;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.class */
public class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT> implements TransformEvaluatorFactory {
    private final ParDoEvaluatorFactory<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> delegateFactory;
    private final EvaluationContext evaluationContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
        this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) throws Exception {
        return createEvaluator(appliedPTransform, committedBundle);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.delegateFactory.cleanup();
    }

    private TransformEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> createEvaluator(AppliedPTransform<PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple, SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT>> appliedPTransform, DirectRunner.CommittedBundle<InputT> committedBundle) throws Exception {
        final SplittableParDo.ProcessElements transform = appliedPTransform.getTransform();
        DoFnLifecycleManager managerForCloneOf = this.delegateFactory.getManagerForCloneOf(transform.getFn());
        DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> newProcessFn = transform.newProcessFn(managerForCloneOf.get());
        String stepName = this.evaluationContext.getStepName(appliedPTransform);
        final DirectExecutionContext.DirectStepContext orCreateStepContext = this.evaluationContext.getExecutionContext(appliedPTransform, committedBundle.getKey()).getOrCreateStepContext(stepName, stepName);
        ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> createParDoEvaluator = this.delegateFactory.createParDoEvaluator(appliedPTransform, transform.getSideInputs(), transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), orCreateStepContext, newProcessFn, managerForCloneOf);
        newProcessFn.setStateInternalsFactory(new StateInternalsFactory<String>() { // from class: org.apache.beam.runners.direct.SplittableProcessElementsEvaluatorFactory.1
            public StateInternals<String> stateInternalsForKey(String str) {
                return orCreateStepContext.m4stateInternals();
            }
        });
        newProcessFn.setTimerInternalsFactory(new TimerInternalsFactory<String>() { // from class: org.apache.beam.runners.direct.SplittableProcessElementsEvaluatorFactory.2
            public TimerInternals timerInternalsForKey(String str) {
                return orCreateStepContext.m3timerInternals();
            }
        });
        final ParDoEvaluator.BundleOutputManager outputManager = createParDoEvaluator.getOutputManager();
        newProcessFn.setOutputWindowedValue(new OutputWindowedValue<OutputT>() { // from class: org.apache.beam.runners.direct.SplittableProcessElementsEvaluatorFactory.3
            public void outputWindowedValue(OutputT outputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                outputManager.output(transform.getMainOutputTag(), WindowedValue.of(outputt, instant, collection, paneInfo));
            }

            public <SideOutputT> void sideOutputWindowedValue(TupleTag<SideOutputT> tupleTag, SideOutputT sideoutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
                outputManager.output(tupleTag, WindowedValue.of(sideoutputt, instant, collection, paneInfo));
            }
        });
        return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(createParDoEvaluator, managerForCloneOf);
    }
}
