package org.apache.beam.runners.spark.translation;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.InMemoryStateInternals;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Function;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Iterators;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Multimap;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/MultiDoFnFunction.class */
public class MultiDoFnFunction<InputT, OutputT> implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
    private final Accumulator<MetricsContainerStepMap> metricsAccum;
    private final String stepName;
    private final DoFn<InputT, OutputT> doFn;
    private final SerializablePipelineOptions options;
    private final TupleTag<OutputT> mainOutputTag;
    private final List<TupleTag<?>> additionalOutputTags;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final boolean stateful;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/MultiDoFnFunction$DoFnOutputManager.class */
    public class DoFnOutputManager implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
        private final Multimap<TupleTag<?>, WindowedValue<?>> outputs;

        private DoFnOutputManager() {
            this.outputs = LinkedListMultimap.create();
        }

        @Override // org.apache.beam.runners.spark.translation.SparkProcessContext.SparkOutputManager
        public void clear() {
            this.outputs.clear();
        }

        @Override // java.lang.Iterable
        public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
            return Iterators.transform(this.outputs.entries().iterator(), entryToTupleFn());
        }

        private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
            return new Function<Map.Entry<K, V>, Tuple2<K, V>>() { // from class: org.apache.beam.runners.spark.translation.MultiDoFnFunction.DoFnOutputManager.1
                @Override // org.apache.beam.runners.spark.repackaged.com.google.common.base.Function
                public Tuple2<K, V> apply(Map.Entry<K, V> entry) {
                    return new Tuple2<>(entry.getKey(), entry.getValue());
                }
            };
        }

        public synchronized <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
            this.outputs.put(tupleTag, windowedValue);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/MultiDoFnFunction$TimerDataIterator.class */
    public static class TimerDataIterator implements Iterator<TimerInternals.TimerData> {
        private InMemoryTimerInternals timerInternals;
        private boolean hasAdvance;
        private TimerInternals.TimerData timerData;

        TimerDataIterator(InMemoryTimerInternals inMemoryTimerInternals) {
            this.timerInternals = inMemoryTimerInternals;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (!this.hasAdvance) {
                try {
                    this.timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
                    this.timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                    this.timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
                    this.hasAdvance = true;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            TimerInternals.TimerData removeNextEventTimer = this.timerInternals.removeNextEventTimer();
            this.timerData = removeNextEventTimer;
            if (removeNextEventTimer == null) {
                TimerInternals.TimerData removeNextProcessingTimer = this.timerInternals.removeNextProcessingTimer();
                this.timerData = removeNextProcessingTimer;
                if (removeNextProcessingTimer == null) {
                    TimerInternals.TimerData removeNextSynchronizedProcessingTimer = this.timerInternals.removeNextSynchronizedProcessingTimer();
                    this.timerData = removeNextSynchronizedProcessingTimer;
                    if (removeNextSynchronizedProcessingTimer == null) {
                        return false;
                    }
                }
            }
            return true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public TimerInternals.TimerData next() {
            if (this.timerData == null) {
                throw new NoSuchElementException();
            }
            return this.timerData;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new RuntimeException("TimerDataIterator not support remove!");
        }
    }

    public MultiDoFnFunction(Accumulator<MetricsContainerStepMap> accumulator, String str, DoFn<InputT, OutputT> doFn, SerializablePipelineOptions serializablePipelineOptions, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> map, WindowingStrategy<?, ?> windowingStrategy, boolean z) {
        this.metricsAccum = accumulator;
        this.stepName = str;
        this.doFn = doFn;
        this.options = serializablePipelineOptions;
        this.mainOutputTag = tupleTag;
        this.additionalOutputTags = list;
        this.sideInputs = map;
        this.windowingStrategy = windowingStrategy;
        this.stateful = z;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18, types: [org.apache.beam.runners.spark.translation.MultiDoFnFunction$1] */
    public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> it) throws Exception {
        final InMemoryTimerInternals inMemoryTimerInternals;
        SparkProcessContext.NoOpStepContext noOpStepContext;
        DoFnOutputManager doFnOutputManager = new DoFnOutputManager();
        if (this.stateful) {
            Object obj = null;
            if (it.hasNext()) {
                WindowedValue<InputT> next = it.next();
                obj = ((KV) next.getValue()).getKey();
                it = Iterators.concat(Iterators.singletonIterator(next), it);
            }
            final InMemoryStateInternals forKey = InMemoryStateInternals.forKey(obj);
            inMemoryTimerInternals = new InMemoryTimerInternals();
            noOpStepContext = new StepContext() { // from class: org.apache.beam.runners.spark.translation.MultiDoFnFunction.1
                public StateInternals stateInternals() {
                    return forKey;
                }

                public TimerInternals timerInternals() {
                    return inMemoryTimerInternals;
                }
            };
        } else {
            inMemoryTimerInternals = null;
            noOpStepContext = new SparkProcessContext.NoOpStepContext();
        }
        return new SparkProcessContext(this.doFn, new DoFnRunnerWithMetrics(this.stepName, DoFnRunners.simpleRunner(this.options.get(), this.doFn, new SparkSideInputReader(this.sideInputs), doFnOutputManager, this.mainOutputTag, this.additionalOutputTags, noOpStepContext, this.windowingStrategy), this.metricsAccum), doFnOutputManager, this.stateful ? new TimerDataIterator(inMemoryTimerInternals) : Collections.emptyIterator()).processPartition(it);
    }
}
