package org.apache.beam.runners.flink.translation.functions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.PerKeyCombineFnRunners;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.class */
public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow> extends FlinkReduceFunction<K, AccumT, OutputT, W> {
    public FlinkMergingReduceFunction(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> perKeyCombineFn, WindowingStrategy<?, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> map, PipelineOptions pipelineOptions) {
        super(perKeyCombineFn, windowingStrategy, map, pipelineOptions);
    }

    @Override // org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction
    public void reduce(Iterable<WindowedValue<KV<K, AccumT>>> iterable, Collector<WindowedValue<KV<K, OutputT>>> collector) throws Exception {
        FlinkSingleOutputProcessContext flinkSingleOutputProcessContext = new FlinkSingleOutputProcessContext(this.serializedOptions.getPipelineOptions(), getRuntimeContext(), this.doFn, this.windowingStrategy, this.sideInputs, collector);
        PerKeyCombineFnRunner create = PerKeyCombineFnRunners.create(this.combineFn);
        OutputTimeFn outputTimeFn = this.windowingStrategy.getOutputTimeFn();
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<WindowedValue<KV<K, AccumT>>> it = iterable.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().explodeWindows().iterator();
            while (it2.hasNext()) {
                newArrayList.add((WindowedValue) it2.next());
            }
        }
        Collections.sort(newArrayList, new Comparator<WindowedValue<KV<K, AccumT>>>() { // from class: org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction.1
            @Override // java.util.Comparator
            public int compare(WindowedValue<KV<K, AccumT>> windowedValue, WindowedValue<KV<K, AccumT>> windowedValue2) {
                return ((BoundedWindow) Iterables.getOnlyElement(windowedValue.getWindows())).maxTimestamp().compareTo(((BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows())).maxTimestamp());
            }
        });
        mergeWindow(newArrayList);
        Iterator it3 = newArrayList.iterator();
        WindowedValue windowedValue = (WindowedValue) it3.next();
        Object key = ((KV) windowedValue.getValue()).getKey();
        BoundedWindow boundedWindow = (IntervalWindow) Iterables.getOnlyElement(windowedValue.getWindows());
        Object value = ((KV) windowedValue.getValue()).getValue();
        ArrayList arrayList = new ArrayList();
        arrayList.add(windowedValue.getTimestamp());
        while (it3.hasNext()) {
            WindowedValue windowedValue2 = (WindowedValue) it3.next();
            BoundedWindow boundedWindow2 = (IntervalWindow) Iterables.getOnlyElement(windowedValue2.getWindows());
            if (boundedWindow2.equals(boundedWindow)) {
                flinkSingleOutputProcessContext.setWindowedValue(windowedValue2);
                value = create.mergeAccumulators(key, ImmutableList.of(value, ((KV) windowedValue2.getValue()).getValue()), flinkSingleOutputProcessContext);
                arrayList.add(windowedValue2.getTimestamp());
            } else {
                collector.collect(WindowedValue.of(KV.of(key, create.extractOutput(key, value, flinkSingleOutputProcessContext)), outputTimeFn.merge(boundedWindow, arrayList), boundedWindow, PaneInfo.NO_FIRING));
                arrayList.clear();
                flinkSingleOutputProcessContext.setWindowedValue(windowedValue2);
                boundedWindow = boundedWindow2;
                value = ((KV) windowedValue2.getValue()).getValue();
                arrayList.add(windowedValue2.getTimestamp());
            }
        }
        collector.collect(WindowedValue.of(KV.of(key, create.extractOutput(key, value, flinkSingleOutputProcessContext)), outputTimeFn.merge(boundedWindow, arrayList), boundedWindow, PaneInfo.NO_FIRING));
    }

    private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> list) {
        IntervalWindow intervalWindow;
        int i = 0;
        IntervalWindow intervalWindow2 = (IntervalWindow) Iterables.getOnlyElement(list.get(0).getWindows());
        for (int i2 = 1; i2 < list.size(); i2++) {
            IntervalWindow intervalWindow3 = (IntervalWindow) Iterables.getOnlyElement(list.get(i2).getWindows());
            if (intervalWindow2.intersects(intervalWindow3)) {
                intervalWindow = intervalWindow2.span(intervalWindow3);
            } else {
                for (int i3 = i2 - 1; i3 >= i; i3--) {
                    WindowedValue<KV<K, AccumT>> windowedValue = list.get(i3);
                    list.set(i3, WindowedValue.of(windowedValue.getValue(), windowedValue.getTimestamp(), intervalWindow2, windowedValue.getPane()));
                }
                i = i2;
                intervalWindow = intervalWindow3;
            }
            intervalWindow2 = intervalWindow;
        }
        if (i < list.size() - 1) {
            for (int size = list.size() - 1; size >= i; size--) {
                WindowedValue<KV<K, AccumT>> windowedValue2 = list.get(size);
                list.set(size, WindowedValue.of(windowedValue2.getValue(), windowedValue2.getTimestamp(), intervalWindow2, windowedValue2.getPane()));
            }
        }
    }
}
