package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.class */
public class SlicingWindowAggOperatorTest {
    private final ZoneId shiftTimeZone;
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", new IntType()), new RowType.RowField("f2", new TimestampType())));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final RowDataSerializer ACC_SER = new RowDataSerializer(new LogicalType[]{new BigIntType(), new BigIntType()});
    private static final LogicalType[] OUTPUT_TYPES = {new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
    private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, (LogicalType[]) INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final PagedTypeSerializer<RowData> KEY_SER = KEY_SELECTOR.getProducedType().toSerializer();
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE));

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest$SumAndCountAggsFunction.class */
    private static class SumAndCountAggsFunction implements NamespaceAggsHandleFunction<Long> {
        private static final long serialVersionUID = 1;
        private final SliceAssigner assigner;
        boolean openCalled;
        final AtomicInteger closeCalled;
        long sum;
        boolean sumIsNull;
        long count;
        boolean countIsNull;
        protected transient JoinedRowData result;

        private SumAndCountAggsFunction(SliceAssigner sliceAssigner) {
            this.closeCalled = new AtomicInteger(0);
            this.assigner = sliceAssigner;
        }

        public void open(StateDataViewStore stateDataViewStore) throws Exception {
            this.openCalled = true;
            this.result = new JoinedRowData();
        }

        public void setAccumulators(Long l, RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            this.sumIsNull = rowData.isNullAt(0);
            if (this.sumIsNull) {
                this.sum = 0L;
            } else {
                this.sum = rowData.getLong(0);
            }
            this.countIsNull = rowData.isNullAt(1);
            if (this.countIsNull) {
                this.count = 0L;
            } else {
                this.count = rowData.getLong(1);
            }
        }

        public void accumulate(RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.sum += rowData.getInt(1);
            this.count += serialVersionUID;
            this.sumIsNull = false;
            this.countIsNull = false;
        }

        public void retract(RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.sum -= rowData.getInt(1);
            this.count -= serialVersionUID;
        }

        public void merge(Long l, RowData rowData) throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            if (!rowData.isNullAt(0)) {
                this.sum += rowData.getLong(0);
                this.sumIsNull = false;
            }
            if (rowData.isNullAt(1)) {
                return;
            }
            this.count += rowData.getLong(1);
            this.countIsNull = false;
        }

        public RowData createAccumulators() {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(2);
            genericRowData.setField(1, 0L);
            return genericRowData;
        }

        public RowData getAccumulators() throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(2);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            return genericRowData;
        }

        public void cleanup(Long l) {
        }

        public void close() {
            this.closeCalled.incrementAndGet();
        }

        public RowData getValue(Long l) throws Exception {
            if (!this.openCalled) {
                Assertions.fail("Open was not called");
            }
            GenericRowData genericRowData = new GenericRowData(4);
            if (!this.sumIsNull) {
                genericRowData.setField(0, Long.valueOf(this.sum));
            }
            if (!this.countIsNull) {
                genericRowData.setField(1, Long.valueOf(this.count));
            }
            genericRowData.setField(1, Long.valueOf(this.count));
            genericRowData.setField(2, Long.valueOf(this.assigner.getWindowStart(l.longValue())));
            genericRowData.setField(3, l);
            return genericRowData;
        }
    }

    public SlicingWindowAggOperatorTest(ZoneId zoneId) {
        this.shiftTimeZone = zoneId;
    }

    @Test
    public void testEventTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner hopping = SliceAssigners.hopping(2, this.shiftTimeZone, Duration.ofSeconds(3L), Duration.ofSeconds(1L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(hopping);
        SlicingWindowOperator build = SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(hopping).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).countStarIndex(1).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(20L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(0L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1998L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1000L)));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(-2000L)), Long.valueOf(localMills(1000L))));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(-1000L)), Long.valueOf(localMills(2000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(-1000L)), Long.valueOf(localMills(2000L))));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        Assertions.assertThat(sumAndCountAggsFunction.closeCalled.get()).as("Close was not called.", new Object[0]).isGreaterThan(0);
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup(OUT_SERIALIZER);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, Long.valueOf(localMills(1000L)), Long.valueOf(localMills(4000L))));
        concurrentLinkedQueue.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3500L)));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(2000L)), Long.valueOf(localMills(5000L))));
        concurrentLinkedQueue.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(2999L)));
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L))));
        concurrentLinkedQueue.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        Assertions.assertThat(build.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        createTestHarness2.close();
    }

    @Test
    public void testProcessingTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner hopping = SliceAssigners.hopping(-1, this.shiftTimeZone, Duration.ofHours(3L), Duration.ofHours(1L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(hopping);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(hopping).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).countStarIndex(1).build());
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T01:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(epochMills(UTC_ZONE_ID, "1969-12-31T22:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T02:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1969-12-31T23:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T03:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T07:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T06:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
        Assertions.assertThat(sumAndCountAggsFunction.closeCalled.get()).as("Close was not called.", new Object[0]).isGreaterThan(0);
    }

    @Test
    public void testEventTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner cumulative = SliceAssigners.cumulative(2, this.shiftTimeZone, Duration.ofSeconds(3L), Duration.ofSeconds(1L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(cumulative);
        SlicingWindowOperator build = SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(cumulative).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(2999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(20L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(0L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1998L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1000L)));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(1000L))));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(2000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(2000L))));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        Assertions.assertThat(sumAndCountAggsFunction.closeCalled.get()).as("Close was not called.", new Object[0]).isGreaterThan(0);
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1000L)));
        createTestHarness2.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(4000L))));
        concurrentLinkedQueue.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 2, TimestampData.fromEpochMillis(3500L)));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(5000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(5000L))));
        concurrentLinkedQueue.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(2999L)));
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L))));
        concurrentLinkedQueue.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        Assertions.assertThat(build.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        createTestHarness2.close();
    }

    @Test
    public void testProcessingTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner cumulative = SliceAssigners.cumulative(-1, this.shiftTimeZone, Duration.ofDays(1L), Duration.ofHours(8L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(cumulative);
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(cumulative).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).build());
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T08:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T08:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T16:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T16:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-02T00:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-03T08:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
        Assertions.assertThat(sumAndCountAggsFunction.closeCalled.get()).as("Close was not called.", new Object[0]).isGreaterThan(0);
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner tumbling = SliceAssigners.tumbling(2, this.shiftTimeZone, Duration.ofSeconds(3L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(tumbling);
        SlicingWindowOperator build = SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(tumbling).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(3000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(20L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(0L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1998L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1999L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(1000L)));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processWatermark(new Watermark(1999L));
        concurrentLinkedQueue.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 0L);
        createTestHarness.close();
        Assertions.assertThat(sumAndCountAggsFunction.closeCalled.get()).as("Close was not called.", new Object[0]).isGreaterThan(0);
        concurrentLinkedQueue.clear();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness2 = createTestHarness(build);
        createTestHarness2.setup();
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        createTestHarness2.processWatermark(new Watermark(2999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L))));
        concurrentLinkedQueue.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(3999L));
        concurrentLinkedQueue.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(2500L)));
        createTestHarness2.processWatermark(new Watermark(4999L));
        concurrentLinkedQueue.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(2999L)));
        createTestHarness2.processWatermark(new Watermark(5999L));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L))));
        concurrentLinkedQueue.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        createTestHarness2.processWatermark(new Watermark(6999L));
        createTestHarness2.processWatermark(new Watermark(7999L));
        concurrentLinkedQueue.add(new Watermark(6999L));
        concurrentLinkedQueue.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness2.getOutput());
        Assertions.assertThat(build.getNumLateRecordsDropped().getCount()).isEqualTo(2L);
        createTestHarness2.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner tumbling = SliceAssigners.tumbling(-1, this.shiftTimeZone, Duration.ofHours(5L));
        SlicingWindowOperator build = SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(tumbling).aggregate(wrapGenerated(new SumAndCountAggsFunction(tumbling)), ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(build);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(Long.MAX_VALUE)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T05:00:00"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00"))));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00"))));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis(7000L)));
        createTestHarness.setProcessingTime(epochMills(this.shiftTimeZone, "1970-01-01T10:00:01"));
        concurrentLinkedQueue.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")), Long.valueOf(epochMills(UTC_ZONE_ID, "1970-01-01T10:00:00"))));
        Assertions.assertThat((Long) build.getWatermarkLatency().getValue()).isEqualTo(0L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testInvalidWindows() {
        SliceAssigners.HoppingSliceAssigner hopping = SliceAssigners.hopping(2, this.shiftTimeZone, Duration.ofSeconds(3L), Duration.ofSeconds(1L));
        SumAndCountAggsFunction sumAndCountAggsFunction = new SumAndCountAggsFunction(hopping);
        Assertions.assertThatThrownBy(() -> {
            SlicingWindowAggOperatorBuilder.builder().inputSerializer(INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(hopping).aggregate(wrapGenerated(sumAndCountAggsFunction), ACC_SER).build();
        }).hasMessageContaining("Hopping window requires a COUNT(*) in the aggregate functions.");
    }

    private long localMills(long j) {
        return TimeWindowUtil.toUtcTimestampMills(j, this.shiftTimeZone);
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(SlicingWindowOperator<RowData, ?> slicingWindowOperator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(slicingWindowOperator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
    }

    private static GeneratedNamespaceAggsHandleFunction<Long> wrapGenerated(final NamespaceAggsHandleFunction<Long> namespaceAggsHandleFunction) {
        return new GeneratedNamespaceAggsHandleFunction<Long>("N/A", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorTest.1
            private static final long serialVersionUID = 1;

            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public NamespaceAggsHandleFunction<Long> m10newInstance(ClassLoader classLoader) {
                return namespaceAggsHandleFunction;
            }
        };
    }

    private static long epochMills(ZoneId zoneId, String str) {
        LocalDateTime parse = LocalDateTime.parse(str);
        return parse.toInstant(zoneId.getRules().getOffset(parse)).toEpochMilli();
    }

    @Parameterized.Parameters(name = "TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList(new Object[]{UTC_ZONE_ID}, new Object[]{SHANGHAI_ZONE_ID});
    }
}
