package org.apache.beam.runners.spark.translation;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/MultiDoFnFunction.class */
public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
    private final Accumulator<NamedAggregators> accum;
    private final OldDoFn<InputT, OutputT> mFunction;
    private final SparkRuntimeContext mRuntimeContext;
    private final TupleTag<OutputT> mMainOutputTag;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs;
    private final WindowFn<Object, ?> windowFn;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/MultiDoFnFunction$ProcCtxt.class */
    public class ProcCtxt extends SparkProcessContext<InputT, OutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final Multimap<TupleTag<?>, WindowedValue<?>> outputs;

        ProcCtxt(OldDoFn<InputT, OutputT> oldDoFn, SparkRuntimeContext sparkRuntimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> map, WindowFn<Object, ?> windowFn) {
            super(oldDoFn, sparkRuntimeContext, map, windowFn);
            this.outputs = LinkedListMultimap.create();
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext
        protected synchronized void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
            this.outputs.put(MultiDoFnFunction.this.mMainOutputTag, windowedValue);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext
        protected <T> void sideOutputWindowedValue(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.put(tupleTag, windowedValue);
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext
        public Accumulator<NamedAggregators> getAccumulator() {
            return MultiDoFnFunction.this.accum;
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext
        protected void clearOutput() {
            this.outputs.clear();
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext
        protected Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> getOutputIterator() {
            return Iterators.transform(this.outputs.entries().iterator(), new Function<Map.Entry<TupleTag<?>, WindowedValue<?>>, Tuple2<TupleTag<?>, WindowedValue<?>>>() { // from class: org.apache.beam.runners.spark.translation.MultiDoFnFunction.ProcCtxt.1
                public Tuple2<TupleTag<?>, WindowedValue<?>> apply(Map.Entry<TupleTag<?>, WindowedValue<?>> entry) {
                    return new Tuple2<>(entry.getKey(), entry.getValue());
                }
            });
        }
    }

    public MultiDoFnFunction(Accumulator<NamedAggregators> accumulator, OldDoFn<InputT, OutputT> oldDoFn, SparkRuntimeContext sparkRuntimeContext, TupleTag<OutputT> tupleTag, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> map, WindowFn<Object, ?> windowFn) {
        this.accum = accumulator;
        this.mFunction = oldDoFn;
        this.mRuntimeContext = sparkRuntimeContext;
        this.mMainOutputTag = tupleTag;
        this.mSideInputs = map;
        this.windowFn = windowFn;
    }

    public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> it) throws Exception {
        return new ProcCtxt(this.mFunction, this.mRuntimeContext, this.mSideInputs, this.windowFn).callWithCtxt(it);
    }
}
