package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/ViewEvaluatorFactory.class */
public class ViewEvaluatorFactory implements TransformEvaluatorFactory {
    private final EvaluationContext context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ViewEvaluatorFactory$DirectCreatePCollectionView.class */
    public static class DirectCreatePCollectionView<ElemT, ViewT> extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
        private final View.CreatePCollectionView<ElemT, ViewT> og;

        private DirectCreatePCollectionView(View.CreatePCollectionView<ElemT, ViewT> createPCollectionView) {
            this.og = createPCollectionView;
        }

        @Override // org.apache.beam.runners.direct.ForwardingPTransform
        public PCollectionView<ViewT> expand(PCollection<ElemT> pCollection) {
            return pCollection.apply(WithKeys.of((Void) null)).setCoder(KvCoder.of(VoidCoder.of(), pCollection.getCoder())).apply(GroupByKey.create()).apply(Values.create()).apply(new WriteView(this.og));
        }

        @Override // org.apache.beam.runners.direct.ForwardingPTransform
        protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() {
            return this.og;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ViewEvaluatorFactory$ViewOverrideFactory.class */
    public static class ViewOverrideFactory<ElemT, ViewT> implements PTransformOverrideFactory<PCollection<ElemT>, PCollectionView<ViewT>, View.CreatePCollectionView<ElemT, ViewT>> {
        @Override // org.apache.beam.runners.direct.PTransformOverrideFactory
        public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> override(View.CreatePCollectionView<ElemT, ViewT> createPCollectionView) {
            return new DirectCreatePCollectionView(createPCollectionView);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ViewEvaluatorFactory$WriteView.class */
    public static final class WriteView<ElemT, ViewT> extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
        private final View.CreatePCollectionView<ElemT, ViewT> og;

        WriteView(View.CreatePCollectionView<ElemT, ViewT> createPCollectionView) {
            this.og = createPCollectionView;
        }

        public PCollectionView<ViewT> expand(PCollection<Iterable<ElemT>> pCollection) {
            return this.og.getView();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ViewEvaluatorFactory(EvaluationContext evaluationContext) {
        this.context = evaluationContext;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) {
        return createEvaluator(appliedPTransform);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() throws Exception {
    }

    private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> appliedPTransform) {
        final DirectRunner.PCollectionViewWriter createPCollectionViewWriter = this.context.createPCollectionViewWriter(appliedPTransform.getInput(), appliedPTransform.getOutput());
        return new TransformEvaluator<Iterable<InT>>() { // from class: org.apache.beam.runners.direct.ViewEvaluatorFactory.1
            private final List<WindowedValue<InT>> elements = new ArrayList();

            @Override // org.apache.beam.runners.direct.TransformEvaluator
            public void processElement(WindowedValue<Iterable<InT>> windowedValue) {
                Iterator it = ((Iterable) windowedValue.getValue()).iterator();
                while (it.hasNext()) {
                    this.elements.add(windowedValue.withValue(it.next()));
                }
            }

            @Override // org.apache.beam.runners.direct.TransformEvaluator
            public TransformResult<Iterable<InT>> finishBundle() {
                createPCollectionViewWriter.add(this.elements);
                StepTransformResult.Builder withoutHold = StepTransformResult.withoutHold(appliedPTransform);
                if (!this.elements.isEmpty()) {
                    withoutHold = withoutHold.withAdditionalOutput(CommittedResult.OutputType.PCOLLECTION_VIEW);
                }
                return withoutHold.build();
            }
        };
    }
}
