/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.FlatMapFunction;

public class DoFnFunction<InputT, OutputT>
implements FlatMapFunction<Iterator<WindowedValue<InputT>>, WindowedValue<OutputT>> {
    private final Accumulator<NamedAggregators> accum;
    private final OldDoFn<InputT, OutputT> mFunction;
    private final SparkRuntimeContext mRuntimeContext;
    private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> mSideInputs;
    private final WindowFn<Object, ?> windowFn;

    public DoFnFunction(Accumulator<NamedAggregators> accum, OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, WindowFn<Object, ?> windowFn) {
        this.accum = accum;
        this.mFunction = fn;
        this.mRuntimeContext = runtime;
        this.mSideInputs = sideInputs;
        this.windowFn = windowFn;
    }

    public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws Exception {
        return new ProcCtxt(this.mFunction, this.mRuntimeContext, this.mSideInputs, this.windowFn).callWithCtxt(iter);
    }

    private class ProcCtxt
    extends SparkProcessContext<InputT, OutputT, WindowedValue<OutputT>> {
        private final List<WindowedValue<OutputT>> outputs;

        ProcCtxt(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, WindowFn<Object, ?> windowFn) {
            super(fn, runtimeContext, sideInputs, windowFn);
            this.outputs = new LinkedList();
        }

        @Override
        protected synchronized void outputWindowedValue(WindowedValue<OutputT> o) {
            this.outputs.add(o);
        }

        @Override
        protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> output) {
            throw new UnsupportedOperationException("sideOutput is an unsupported operation for doFunctions, use a MultiDoFunction instead.");
        }

        @Override
        public Accumulator<NamedAggregators> getAccumulator() {
            return DoFnFunction.this.accum;
        }

        @Override
        protected void clearOutput() {
            this.outputs.clear();
        }

        @Override
        protected Iterator<WindowedValue<OutputT>> getOutputIterator() {
            return this.outputs.iterator();
        }
    }
}

