package org.apache.beam.runners.spark.translation;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.aggregators.SparkAggregators;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/DoFnFunction.class */
public class DoFnFunction<InputT, OutputT> implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
    private final Accumulator<NamedAggregators> aggregatorsAccum;
    private final Accumulator<SparkMetricsContainer> metricsAccum;
    private final String stepName;
    private final DoFn<InputT, OutputT> doFn;
    private final SparkRuntimeContext runtimeContext;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/DoFnFunction$DoFnOutputManager.class */
    public class DoFnOutputManager implements SparkProcessContext.SparkOutputManager<WindowedValue<OutputT>> {
        private final List<WindowedValue<OutputT>> outputs;

        private DoFnOutputManager() {
            this.outputs = new LinkedList();
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext.SparkOutputManager
        public void clear() {
            this.outputs.clear();
        }

        @Override // java.lang.Iterable
        public Iterator<WindowedValue<OutputT>> iterator() {
            return this.outputs.iterator();
        }

        public synchronized <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.add(windowedValue);
        }
    }

    public DoFnFunction(Accumulator<NamedAggregators> accumulator, Accumulator<SparkMetricsContainer> accumulator2, String str, DoFn<InputT, OutputT> doFn, SparkRuntimeContext sparkRuntimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy) {
        this.aggregatorsAccum = accumulator;
        this.metricsAccum = accumulator2;
        this.stepName = str;
        this.doFn = doFn;
        this.runtimeContext = sparkRuntimeContext;
        this.sideInputs = map;
        this.windowingStrategy = windowingStrategy;
    }

    public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> it) throws Exception {
        DoFnOutputManager doFnOutputManager = new DoFnOutputManager();
        return new SparkProcessContext(this.doFn, new DoFnRunnerWithMetrics(this.stepName, DoFnRunners.simpleRunner(this.runtimeContext.getPipelineOptions(), this.doFn, new SparkSideInputReader(this.sideInputs), doFnOutputManager, new TupleTag<OutputT>() { // from class: org.apache.beam.runners.spark.translation.DoFnFunction.1
        }, Collections.emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(this.runtimeContext, this.aggregatorsAccum), this.windowingStrategy), this.metricsAccum), doFnOutputManager).processPartition(it);
    }
}
