/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import com.google.common.base.Preconditions;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.spark.Accumulator;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SparkProcessContext<InputT, OutputT, ValueT>
extends OldDoFn.ProcessContext {
    private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
    private final OldDoFn<InputT, OutputT> fn;
    private final SparkRuntimeContext mRuntimeContext;
    private final SideInputReader sideInputReader;
    private final WindowFn<Object, ?> windowFn;
    WindowedValue<InputT> windowedValue;

    SparkProcessContext(OldDoFn<InputT, OutputT> fn, SparkRuntimeContext runtime, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs, WindowFn<Object, ?> windowFn) {
        super(fn);
        this.fn = fn;
        this.mRuntimeContext = runtime;
        this.sideInputReader = new SparkSideInputReader(sideInputs);
        this.windowFn = windowFn;
    }

    void setup() {
        this.setupDelegateAggregators();
    }

    Iterable<ValueT> callWithCtxt(Iterator<WindowedValue<InputT>> iter) throws Exception {
        this.setup();
        if (!iter.hasNext()) {
            return Lists.newArrayList();
        }
        try {
            this.fn.setup();
            this.fn.startBundle((OldDoFn.Context)this);
            return this.getOutputIterable(iter, this.fn);
        }
        catch (Exception e) {
            try {
                this.fn.teardown();
            }
            catch (Exception teardownException) {
                LOG.error("Suppressing exception while tearing down Function {}", this.fn, (Object)teardownException);
                e.addSuppressed(teardownException);
            }
            throw this.wrapUserCodeException(e);
        }
    }

    public PipelineOptions getPipelineOptions() {
        return this.mRuntimeContext.getPipelineOptions();
    }

    public <T> T sideInput(PCollectionView<T> view) {
        Collection elementWindows = this.windowedValue.getWindows();
        Preconditions.checkState((elementWindows.size() == 1 ? 1 : 0) != 0, (Object)"sideInput can only be called when the main input element is in exactly one window");
        return (T)this.sideInputReader.get(view, (BoundedWindow)elementWindows.iterator().next());
    }

    public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal(String named, Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combineFn) {
        return this.mRuntimeContext.createAggregator(this.getAccumulator(), named, combineFn);
    }

    public abstract Accumulator<NamedAggregators> getAccumulator();

    public InputT element() {
        return (InputT)this.windowedValue.getValue();
    }

    public void output(OutputT output) {
        this.outputWithTimestamp(output, this.windowedValue != null ? this.windowedValue.getTimestamp() : null);
    }

    public void outputWithTimestamp(OutputT output, Instant timestamp) {
        if (this.windowedValue == null) {
            this.outputWindowedValue(SparkProcessContext.noElementWindowedValue(output, timestamp, this.windowFn));
        } else {
            this.outputWindowedValue(WindowedValue.of(output, (Instant)timestamp, (Collection)this.windowedValue.getWindows(), (PaneInfo)this.windowedValue.getPane()));
        }
    }

    public <T> void sideOutput(TupleTag<T> tag, T output) {
        this.sideOutputWithTimestamp(tag, output, this.windowedValue != null ? this.windowedValue.getTimestamp() : null);
    }

    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
        if (this.windowedValue == null) {
            this.sideOutputWindowedValue(tag, SparkProcessContext.noElementWindowedValue(output, timestamp, this.windowFn));
        } else {
            this.sideOutputWindowedValue(tag, WindowedValue.of(output, (Instant)timestamp, (Collection)this.windowedValue.getWindows(), (PaneInfo)this.windowedValue.getPane()));
        }
    }

    protected abstract void outputWindowedValue(WindowedValue<OutputT> var1);

    protected abstract <T> void sideOutputWindowedValue(TupleTag<T> var1, WindowedValue<T> var2);

    static <T, W extends BoundedWindow> WindowedValue<T> noElementWindowedValue(final T output, final Instant timestamp, WindowFn<Object, W> windowFn) {
        WindowFn<Object, W> windowFn2 = windowFn;
        windowFn2.getClass();
        WindowFn.AssignContext assignContext = new WindowFn.AssignContext(windowFn2){

            public Object element() {
                return output;
            }

            public Instant timestamp() {
                if (timestamp != null) {
                    return timestamp;
                }
                throw new UnsupportedOperationException("outputWithTimestamp was called with null timestamp.");
            }

            public BoundedWindow window() {
                throw new UnsupportedOperationException("Window not available for start/finishBundle output.");
            }
        };
        try {
            Collection windows = windowFn.assignWindows(assignContext);
            Instant outputTimestamp = timestamp != null ? timestamp : BoundedWindow.TIMESTAMP_MIN_VALUE;
            return WindowedValue.of(output, (Instant)outputTimestamp, (Collection)windows, (PaneInfo)PaneInfo.NO_FIRING);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to assign windows at start/finishBundle.", e);
        }
    }

    public Instant timestamp() {
        return this.windowedValue.getTimestamp();
    }

    public BoundedWindow window() {
        if (!(this.fn instanceof OldDoFn.RequiresWindowAccess)) {
            throw new UnsupportedOperationException("window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
        }
        return (BoundedWindow)Iterables.getOnlyElement((Iterable)this.windowedValue.getWindows());
    }

    public PaneInfo pane() {
        return this.windowedValue.getPane();
    }

    public WindowingInternals<InputT, OutputT> windowingInternals() {
        return new WindowingInternals<InputT, OutputT>(){

            public Collection<? extends BoundedWindow> windows() {
                return SparkProcessContext.this.windowedValue.getWindows();
            }

            public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
                SparkProcessContext.this.outputWindowedValue(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)paneInfo));
            }

            public <SideOutputT> void sideOutputWindowedValue(TupleTag<SideOutputT> tag, SideOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo paneInfo) {
                SparkProcessContext.this.sideOutputWindowedValue(tag, WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)paneInfo));
            }

            public StateInternals stateInternals() {
                return InMemoryStateInternals.forKey((Object)"DUMMY");
            }

            public TimerInternals timerInternals() {
                throw new UnsupportedOperationException("WindowingInternals#timerInternals() is not yet supported.");
            }

            public PaneInfo pane() {
                return SparkProcessContext.this.windowedValue.getPane();
            }

            public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
                throw new UnsupportedOperationException("WindowingInternals#sideInput() is not yet supported.");
            }
        };
    }

    protected abstract void clearOutput();

    protected abstract Iterator<ValueT> getOutputIterator();

    protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter, final OldDoFn<InputT, OutputT> doFn) {
        return new Iterable<ValueT>(){

            @Override
            public Iterator<ValueT> iterator() {
                return new ProcCtxtIterator(iter, doFn);
            }
        };
    }

    private RuntimeException wrapUserCodeException(Throwable t) {
        throw UserCodeException.wrapIf((!this.isSystemDoFn() ? 1 : 0) != 0, (Throwable)t);
    }

    private boolean isSystemDoFn() {
        return this.fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
    }

    private class ProcCtxtIterator
    extends AbstractIterator<ValueT> {
        private final Iterator<WindowedValue<InputT>> inputIterator;
        private final OldDoFn<InputT, OutputT> doFn;
        private Iterator<ValueT> outputIterator;
        private boolean calledFinish;

        ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, OldDoFn<InputT, OutputT> doFn) {
            this.inputIterator = iterator;
            this.doFn = doFn;
            this.outputIterator = SparkProcessContext.this.getOutputIterator();
        }

        protected ValueT computeNext() {
            while (true) {
                if (this.outputIterator.hasNext()) {
                    return this.outputIterator.next();
                }
                if (this.inputIterator.hasNext()) {
                    SparkProcessContext.this.clearOutput();
                    SparkProcessContext.this.windowedValue = this.inputIterator.next();
                    if (SparkProcessContext.this.windowedValue.getWindows().size() <= 1 || !OldDoFn.RequiresWindowAccess.class.isAssignableFrom(this.doFn.getClass()) && SparkProcessContext.this.sideInputReader.isEmpty()) {
                        this.invokeProcessElement();
                    } else {
                        Iterator iterator = SparkProcessContext.this.windowedValue.explodeWindows().iterator();
                        while (iterator.hasNext()) {
                            WindowedValue wv;
                            SparkProcessContext.this.windowedValue = wv = (WindowedValue)iterator.next();
                            this.invokeProcessElement();
                        }
                    }
                    this.outputIterator = SparkProcessContext.this.getOutputIterator();
                    continue;
                }
                if (this.calledFinish) break;
                SparkProcessContext.this.windowedValue = null;
                SparkProcessContext.this.clearOutput();
                try {
                    this.calledFinish = true;
                    this.doFn.finishBundle((OldDoFn.Context)SparkProcessContext.this);
                }
                catch (Exception e) {
                    this.handleProcessingException(e);
                    throw SparkProcessContext.this.wrapUserCodeException(e);
                }
                this.outputIterator = SparkProcessContext.this.getOutputIterator();
            }
            try {
                this.doFn.teardown();
            }
            catch (Exception e) {
                LOG.error("Suppressing teardown exception that occurred after processing entire input", (Throwable)e);
            }
            return this.endOfData();
        }

        private void invokeProcessElement() {
            try {
                this.doFn.processElement((OldDoFn.ProcessContext)SparkProcessContext.this);
            }
            catch (Exception e) {
                this.handleProcessingException(e);
                throw SparkProcessContext.this.wrapUserCodeException(e);
            }
        }

        private void handleProcessingException(Exception e) {
            try {
                this.doFn.teardown();
            }
            catch (Exception e1) {
                LOG.error("Exception while cleaning up DoFn", (Throwable)e1);
                e.addSuppressed(e1);
            }
        }
    }
}

