package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayDeque;
import java.util.Objects;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.class */
public class ContinuousProcessingTimeTriggerTest {
    private static final long NO_TIMESTAMP = Watermark.UNINITIALIZED.getTimestamp();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest$IntegerSumWindowFunction.class */
    private static class IntegerSumWindowFunction implements WindowFunction<Integer, WindowedInteger, Byte, TimeWindow> {
        private IntegerSumWindowFunction() {
        }

        public void apply(Byte b, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<WindowedInteger> collector) throws Exception {
            collector.collect(new WindowedInteger(timeWindow, StreamSupport.stream(iterable.spliterator(), false).mapToInt((v0) -> {
                return v0.intValue();
            }).sum()));
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Byte) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<WindowedInteger>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest$WindowedInteger.class */
    public static class WindowedInteger {
        private final TimeWindow window;
        private final int value;

        public WindowedInteger(TimeWindow timeWindow, int i) {
            this.window = timeWindow;
            this.value = i;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof WindowedInteger)) {
                return false;
            }
            WindowedInteger windowedInteger = (WindowedInteger) obj;
            return this.value == windowedInteger.value && Objects.equals(this.window, windowedInteger.window);
        }

        public int hashCode() {
            return Objects.hash(this.window, Integer.valueOf(this.value));
        }

        public String toString() {
            return "WindowedInteger{window=" + this.window + ", value=" + this.value + '}';
        }
    }

    @Test
    public void testWindowFiring() throws Exception {
        ContinuousProcessingTimeTrigger of = ContinuousProcessingTimeTrigger.of(Time.milliseconds(5L));
        Assert.assertTrue(of.canMerge());
        WindowOperator windowOperator = new WindowOperator(TumblingProcessingTimeWindows.of(Time.milliseconds(10L)), new TimeWindow.Serializer(), new NullByteKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new IntegerSumWindowFunction()), of, 0L, (OutputTag) null);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) windowOperator, windowOperator.getKeySelector(), (TypeInformation) BasicTypeInfo.BYTE_TYPE_INFO);
        ArrayDeque arrayDeque = new ArrayDeque();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(1, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(2L);
        keyedOneInputStreamOperatorTestHarness.processElement(2, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(5L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(0L, 10L), 3), 9L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(7L);
        keyedOneInputStreamOperatorTestHarness.processElement(3, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(9L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(0L, 10L), 6), 9L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(10L);
        keyedOneInputStreamOperatorTestHarness.processElement(3, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(15L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(10L, 20L), 3), 19L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(18L);
        keyedOneInputStreamOperatorTestHarness.processElement(3, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(20L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(10L, 20L), 6), 19L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testMergingWindows() throws Exception {
        ContinuousProcessingTimeTrigger of = ContinuousProcessingTimeTrigger.of(Time.milliseconds(5L));
        Assert.assertTrue(of.canMerge());
        WindowOperator windowOperator = new WindowOperator(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10L)), new TimeWindow.Serializer(), new NullByteKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()), new ListStateDescriptor("window-contents", BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())), new InternalIterableWindowFunction(new IntegerSumWindowFunction()), of, 0L, (OutputTag) null);
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) windowOperator, windowOperator.getKeySelector(), (TypeInformation) BasicTypeInfo.BYTE_TYPE_INFO);
        ArrayDeque arrayDeque = new ArrayDeque();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(0L);
        keyedOneInputStreamOperatorTestHarness.processElement(1, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(2L);
        keyedOneInputStreamOperatorTestHarness.processElement(2, NO_TIMESTAMP);
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(5L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(9L);
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(10L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(15L);
        arrayDeque.add(new StreamRecord(new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(100L);
        TestHarnessUtil.assertOutputEquals("Output mismatch", arrayDeque, keyedOneInputStreamOperatorTestHarness.getOutput());
    }
}
