package org.apache.beam.runners.flink.translation.functions;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.class */
public class FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, W extends IntervalWindow> extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
    private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
    private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction.1
        public void processElement(OldDoFn<KV<K, InputT>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
        }
    };
    private final WindowingStrategy<?, W> windowingStrategy;
    private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
    private final SerializedPipelineOptions serializedOptions;

    public FlinkMergingNonShuffleReduceFunction(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn, WindowingStrategy<?, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map, PipelineOptions pipelineOptions) {
        this.combineFn = perKeyCombineFn;
        this.windowingStrategy = windowingStrategy;
        this.sideInputs = map;
        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
    }

    public void reduce(Iterable<WindowedValue<KV<K, InputT>>> iterable, Collector<WindowedValue<KV<K, OutputT>>> collector) throws Exception {
        FlinkSingleOutputProcessContext flinkSingleOutputProcessContext = new FlinkSingleOutputProcessContext(this.serializedOptions.getPipelineOptions(), getRuntimeContext(), this.doFn, this.windowingStrategy, this.sideInputs, collector);
        PerKeyCombineFnRunner create = PerKeyCombineFnRunners.create(this.combineFn);
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<WindowedValue<KV<K, InputT>>> it = iterable.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().explodeWindows().iterator();
            while (it2.hasNext()) {
                newArrayList.add((WindowedValue) it2.next());
            }
        }
        Collections.sort(newArrayList, new Comparator<WindowedValue<KV<K, InputT>>>() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction.2
            @Override // java.util.Comparator
            public int compare(WindowedValue<KV<K, InputT>> windowedValue, WindowedValue<KV<K, InputT>> windowedValue2) {
                return ((BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows())).maxTimestamp().compareTo(((BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows())).maxTimestamp());
            }
        });
        mergeWindow(newArrayList);
        Iterator<WindowedValue<KV<K, InputT>>> it3 = newArrayList.iterator();
        WindowedValue<KV<K, InputT>> next = it3.next();
        Object key = ((KV) next.getValue()).getKey();
        BoundedWindow boundedWindow = (IntervalWindow) Iterables.getOnlyElement(next.getWindows());
        Object value = ((KV) next.getValue()).getValue();
        flinkSingleOutputProcessContext.setWindowedValue(next);
        Object addInput = create.addInput(key, create.createAccumulator(key, flinkSingleOutputProcessContext), value, flinkSingleOutputProcessContext);
        Instant assignOutputTime = outputTimeFn.assignOutputTime(next.getTimestamp(), boundedWindow);
        while (true) {
            Instant instant = assignOutputTime;
            if (!it3.hasNext()) {
                collector.collect(WindowedValue.of(KV.of(key, create.extractOutput(key, addInput, flinkSingleOutputProcessContext)), instant, boundedWindow, PaneInfo.NO_FIRING));
                return;
            }
            WindowedValue<KV<K, InputT>> next2 = it3.next();
            BoundedWindow boundedWindow2 = (IntervalWindow) Iterables.getOnlyElement(next2.getWindows());
            if (boundedWindow.equals(boundedWindow2)) {
                Object value2 = ((KV) next2.getValue()).getValue();
                flinkSingleOutputProcessContext.setWindowedValue(next2);
                addInput = create.addInput(key, addInput, value2, flinkSingleOutputProcessContext);
                assignOutputTime = outputTimeFn.combine(instant, outputTimeFn.assignOutputTime(next2.getTimestamp(), boundedWindow));
            } else {
                collector.collect(WindowedValue.of(KV.of(key, create.extractOutput(key, addInput, flinkSingleOutputProcessContext)), instant, boundedWindow, PaneInfo.NO_FIRING));
                boundedWindow = boundedWindow2;
                Object value3 = ((KV) next2.getValue()).getValue();
                flinkSingleOutputProcessContext.setWindowedValue(next2);
                addInput = create.addInput(key, create.createAccumulator(key, flinkSingleOutputProcessContext), value3, flinkSingleOutputProcessContext);
                assignOutputTime = outputTimeFn.assignOutputTime(next2.getTimestamp(), boundedWindow);
            }
        }
    }

    private void mergeWindow(List<WindowedValue<KV<K, InputT>>> list) {
        IntervalWindow intervalWindow;
        int i = 0;
        IntervalWindow intervalWindow2 = (IntervalWindow) Iterables.getOnlyElement(list.get(0).getWindows());
        for (int i2 = 1; i2 < list.size(); i2++) {
            IntervalWindow intervalWindow3 = (IntervalWindow) Iterables.getOnlyElement(list.get(i2).getWindows());
            if (intervalWindow2.intersects(intervalWindow3)) {
                intervalWindow = intervalWindow2.span(intervalWindow3);
            } else {
                for (int i3 = i2 - 1; i3 >= i; i3--) {
                    WindowedValue<KV<K, InputT>> windowedValue = list.get(i3);
                    list.set(i3, WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), intervalWindow2, windowedValue.getPane()));
                }
                i = i2;
                intervalWindow = intervalWindow3;
            }
            intervalWindow2 = intervalWindow;
        }
        if (i < list.size() - 1) {
            for (int size = list.size() - 1; size >= i; size--) {
                WindowedValue<KV<K, InputT>> windowedValue2 = list.get(size);
                list.set(size, WindowedValue.of(windowedValue2.getValue(), windowedValue2.getTimestamp(), intervalWindow2, windowedValue2.getPane()));
            }
        }
    }
}
