/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.StructuralKey;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;

class ParDoEvaluator<InputT, OutputT>
implements TransformEvaluator<InputT> {
    private final EvaluationContext evaluationContext;
    private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner;
    private final AppliedPTransform<?, ?, ?> transform;
    private final AggregatorContainer.Mutator aggregatorChanges;
    private final BundleOutputManager outputManager;
    private final DirectExecutionContext.DirectStepContext stepContext;
    private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements;

    public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create(EvaluationContext evaluationContext, DirectExecutionContext.DirectStepContext stepContext, AppliedPTransform<?, ?, ?> application, WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, DoFn<InputT, OutputT> fn, StructuralKey<?> key, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, PCollection<?>> outputs) {
        AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator();
        HashMap outputBundles = new HashMap();
        for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
            if (evaluationContext.isKeyed((PValue)outputEntry.getValue())) {
                outputBundles.put(outputEntry.getKey(), evaluationContext.createKeyedBundle(key, outputEntry.getValue()));
                continue;
            }
            outputBundles.put(outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue()));
        }
        BundleOutputManager outputManager = BundleOutputManager.create(outputBundles);
        ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs);
        DoFnRunner underlying = DoFnRunners.simpleRunner((PipelineOptions)evaluationContext.getPipelineOptions(), fn, (SideInputReader)sideInputReader, (DoFnRunners.OutputManager)outputManager, mainOutputTag, sideOutputTags, (ExecutionContext.StepContext)stepContext, (AggregatorFactory)aggregatorChanges, windowingStrategy);
        PushbackSideInputDoFnRunner runner = PushbackSideInputDoFnRunner.create((DoFnRunner)underlying, sideInputs, (ReadyCheckingSideInputReader)sideInputReader);
        try {
            runner.startBundle();
        }
        catch (Exception e) {
            throw UserCodeException.wrap((Throwable)e);
        }
        return new ParDoEvaluator<InputT, OutputT>(evaluationContext, runner, application, aggregatorChanges, outputManager, stepContext);
    }

    private ParDoEvaluator(EvaluationContext evaluationContext, PushbackSideInputDoFnRunner<InputT, ?> fnRunner, AppliedPTransform<?, ?, ?> transform, AggregatorContainer.Mutator aggregatorChanges, BundleOutputManager outputManager, DirectExecutionContext.DirectStepContext stepContext) {
        this.evaluationContext = evaluationContext;
        this.fnRunner = fnRunner;
        this.transform = transform;
        this.outputManager = outputManager;
        this.stepContext = stepContext;
        this.aggregatorChanges = aggregatorChanges;
        this.unprocessedElements = ImmutableList.builder();
    }

    public BundleOutputManager getOutputManager() {
        return this.outputManager;
    }

    @Override
    public void processElement(WindowedValue<InputT> element) {
        try {
            Iterable unprocessed = this.fnRunner.processElementInReadyWindows(element);
            this.unprocessedElements.addAll(unprocessed);
        }
        catch (Exception e) {
            throw UserCodeException.wrap((Throwable)e);
        }
    }

    public void onTimer(TimerInternals.TimerData timer, BoundedWindow window) {
        try {
            this.fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
        }
        catch (Exception e) {
            throw UserCodeException.wrap((Throwable)e);
        }
    }

    @Override
    public TransformResult<InputT> finishBundle() {
        try {
            this.fnRunner.finishBundle();
        }
        catch (Exception e) {
            throw UserCodeException.wrap((Throwable)e);
        }
        CopyOnAccessInMemoryStateInternals<?> state = this.stepContext.commitState();
        StepTransformResult.Builder resultBuilder = state != null ? StepTransformResult.withHold(this.transform, state.getEarliestWatermarkHold()).withState(state) : StepTransformResult.withoutHold(this.transform);
        return resultBuilder.addOutput(this.outputManager.bundles.values()).withTimerUpdate(this.stepContext.getTimerUpdate()).withAggregatorChanges(this.aggregatorChanges).addUnprocessedElements(this.unprocessedElements.build()).build();
    }

    static class BundleOutputManager
    implements DoFnRunners.OutputManager {
        private final Map<TupleTag<?>, DirectRunner.UncommittedBundle<?>> bundles;
        private final Map<TupleTag<?>, List<?>> undeclaredOutputs;

        public static BundleOutputManager create(Map<TupleTag<?>, DirectRunner.UncommittedBundle<?>> outputBundles) {
            return new BundleOutputManager(outputBundles);
        }

        private BundleOutputManager(Map<TupleTag<?>, DirectRunner.UncommittedBundle<?>> bundles) {
            this.bundles = bundles;
            this.undeclaredOutputs = new HashMap();
        }

        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            DirectRunner.UncommittedBundle<?> bundle = this.bundles.get(tag);
            if (bundle == null) {
                List<?> undeclaredContents = this.undeclaredOutputs.get(tag);
                if (undeclaredContents == null) {
                    undeclaredContents = new ArrayList();
                    this.undeclaredOutputs.put(tag, undeclaredContents);
                }
                undeclaredContents.add(output);
            } else {
                bundle.add(output);
            }
        }
    }
}

