package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.class */
public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
    private final WindowFunction<String, String, String, TimeWindow> mockFunction = (WindowFunction) Mockito.mock(WindowFunction.class);
    private final KeySelector<String, String> mockKeySelector = (KeySelector) Mockito.mock(KeySelector.class);
    private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.1
        public Integer getKey(Integer num) {
            return num;
        }
    };
    private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction = new WindowFunction<Integer, Integer, Integer, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AccumulatingAlignedProcessingTimeWindowOperatorTest.2
        public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) {
            for (Integer num2 : iterable) {
                Assert.assertEquals(num, num2);
                collector.collect(num2);
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest$StatefulFunction.class */
    private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
        static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap();
        private ValueState<Integer> state;

        private StatefulFunction() {
        }

        public void open(Configuration configuration) {
            Assert.assertNotNull(getRuntimeContext());
            this.state = getRuntimeContext().getState(new ValueStateDescriptor("totalCount", Integer.class, 0));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void apply(Integer num, TimeWindow timeWindow, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
            for (Integer num2 : iterable) {
                this.state.update(Integer.valueOf(((Integer) this.state.value()).intValue() + 1));
                globalCounts.put(num, this.state.value());
                collector.collect(num2);
            }
        }

        public /* bridge */ /* synthetic */ void apply(Object obj, Window window, Iterable iterable, Collector collector) throws Exception {
            apply((Integer) obj, (TimeWindow) window, (Iterable<Integer>) iterable, (Collector<Integer>) collector);
        }
    }

    public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
        ClosureCleaner.clean(this.identitySelector, false);
        ClosureCleaner.clean(this.validatingIdentityFunction, false);
    }

    @After
    public void checkNoTriggerThreadsRunning() {
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
        }
        Assert.assertTrue("Not all trigger threads where properly shut down", StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
    }

    @Test
    public void testInvalidParameters() {
        try {
            assertInvalidParameter(-1L, -1L);
            assertInvalidParameter(10000L, -1L);
            assertInvalidParameter(-1L, 1000L);
            assertInvalidParameter(1000L, 2000L);
            assertInvalidParameter(1000L, 999L);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowSizeAndSlide() {
        try {
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            Assert.assertEquals(5000L, accumulatingProcessingTimeWindowOperator.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator.getWindowSlide());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator.getPaneSize());
            Assert.assertEquals(5L, accumulatingProcessingTimeWindowOperator.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getWindowSlide());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator2.getPaneSize());
            Assert.assertEquals(1L, accumulatingProcessingTimeWindowOperator2.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator3 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            Assert.assertEquals(1500L, accumulatingProcessingTimeWindowOperator3.getWindowSize());
            Assert.assertEquals(1000L, accumulatingProcessingTimeWindowOperator3.getWindowSlide());
            Assert.assertEquals(500L, accumulatingProcessingTimeWindowOperator3.getPaneSize());
            Assert.assertEquals(3L, accumulatingProcessingTimeWindowOperator3.getNumPanesPerWindow());
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator4 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            Assert.assertEquals(1200L, accumulatingProcessingTimeWindowOperator4.getWindowSize());
            Assert.assertEquals(1100L, accumulatingProcessingTimeWindowOperator4.getWindowSlide());
            Assert.assertEquals(100L, accumulatingProcessingTimeWindowOperator4.getPaneSize());
            Assert.assertEquals(12L, accumulatingProcessingTimeWindowOperator4.getNumPanesPerWindow());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWindowTriggerTimeAlignment() throws Exception {
        try {
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness.close();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator2 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator2, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness2.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator2.getNextSlideTime() % 1000 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator2.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness2.close();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator3 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500L, 1000L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness3 = new KeyedOneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator3, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness3.open();
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator3.getNextSlideTime() % 500 == 0);
            Assert.assertTrue(accumulatingProcessingTimeWindowOperator3.getNextEvaluationTime() % 1000 == 0);
            keyedOneInputStreamOperatorTestHarness3.close();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator4 = new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200L, 1100L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness4 = new KeyedOneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator4, this.mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness4.open();
            Assert.assertEquals(0L, accumulatingProcessingTimeWindowOperator4.getNextSlideTime() % 100);
            Assert.assertEquals(0L, accumulatingProcessingTimeWindowOperator4.getNextEvaluationTime() % 1100);
            keyedOneInputStreamOperatorTestHarness4.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTumblingWindow() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L), this.identitySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            long j = 0;
            for (int i = 0; i < 1000; i++) {
                keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i)));
                j += 10;
                keyedOneInputStreamOperatorTestHarness.setProcessingTime(j);
            }
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            Assert.assertEquals(1000L, extractFromStreamRecords.size());
            Collections.sort(extractFromStreamRecords);
            for (int i2 = 0; i2 < 1000; i2++) {
                Assert.assertEquals(i2, ((Integer) extractFromStreamRecords.get(i2)).intValue());
            }
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingWindow() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150L, 50L), this.identitySelector, BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        long j = 0;
        for (int i = 0; i < 1000; i++) {
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i)));
            j += 10;
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(j);
        }
        List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
        if (extractFromStreamRecords.size() < 1000 || extractFromStreamRecords.size() > 3000) {
            Assert.fail("Wrong number of results: " + extractFromStreamRecords.size());
        }
        Collections.sort(extractFromStreamRecords);
        int i2 = -1;
        int i3 = -1;
        Iterator it = extractFromStreamRecords.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (intValue == i2) {
                i3++;
                Assert.assertTrue(i3 <= 3);
            } else {
                i2 = intValue;
                i3 = 1;
            }
        }
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTumblingWindowSingleElements() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L), this.identitySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(50L);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(3));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(4));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(100L);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(6));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(200L);
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            Assert.assertEquals(6L, extractFromStreamRecords.size());
            Collections.sort(extractFromStreamRecords);
            Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), extractFromStreamRecords);
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSlidingWindowSingleElements() throws Exception {
        try {
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150L, 50L), this.identitySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(50L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(100L);
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(150L);
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.extractOutputStreamRecords());
            Assert.assertEquals(6L, extractFromStreamRecords.size());
            Collections.sort(extractFromStreamRecords);
            Assert.assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), extractFromStreamRecords);
            keyedOneInputStreamOperatorTestHarness.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowTumbling() {
        try {
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 200L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator);
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            oneInputStreamOperatorTestHarness.setProcessingTime(0L);
            for (int i = 0; i < 700; i++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i)));
            }
            int size = oneInputStreamOperatorTestHarness.getOutput().size();
            StreamStateHandle snapshotLegacy = oneInputStreamOperatorTestHarness.snapshotLegacy(1L, System.currentTimeMillis());
            List extractFromStreamRecords = extractFromStreamRecords(oneInputStreamOperatorTestHarness.getOutput());
            int size2 = oneInputStreamOperatorTestHarness.getOutput().size();
            Assert.assertEquals("operator performed computation during snapshot", size, size2);
            Assert.assertTrue(size2 <= 700);
            for (int i2 = 0; i2 < 300; i2++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i2 + 700)));
            }
            oneInputStreamOperatorTestHarness.close();
            accumulatingProcessingTimeWindowOperator.dispose();
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 200L));
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.restore(snapshotLegacy);
            oneInputStreamOperatorTestHarness2.open();
            for (int i3 = 700; i3 < 1000; i3++) {
                oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(Integer.valueOf(i3)));
            }
            oneInputStreamOperatorTestHarness2.setProcessingTime(400L);
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(extractFromStreamRecords);
            arrayList.addAll(extractFromStreamRecords(oneInputStreamOperatorTestHarness2.getOutput()));
            Assert.assertEquals(1000L, arrayList.size());
            Collections.sort(arrayList);
            for (int i4 = 0; i4 < 1000; i4++) {
                Assert.assertEquals(i4, ((Integer) arrayList.get(i4)).intValue());
            }
            oneInputStreamOperatorTestHarness2.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void checkpointRestoreWithPendingWindowSliding() {
        try {
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 50L));
            oneInputStreamOperatorTestHarness.setProcessingTime(0L);
            oneInputStreamOperatorTestHarness.setup();
            oneInputStreamOperatorTestHarness.open();
            for (int i = 0; i < 700; i++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i)));
            }
            List extractFromStreamRecords = extractFromStreamRecords(oneInputStreamOperatorTestHarness.getOutput());
            int size = oneInputStreamOperatorTestHarness.getOutput().size();
            StreamStateHandle snapshotLegacy = oneInputStreamOperatorTestHarness.snapshotLegacy(1L, System.currentTimeMillis());
            Assert.assertEquals("operator performed computation during snapshot", size, oneInputStreamOperatorTestHarness.getOutput().size());
            Assert.assertTrue(extractFromStreamRecords.size() <= 2800);
            for (int i2 = 700; i2 < 1000; i2++) {
                oneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i2)));
            }
            oneInputStreamOperatorTestHarness.close();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(this.validatingIdentityFunction, this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 200L, 50L);
            OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator);
            oneInputStreamOperatorTestHarness2.setup();
            oneInputStreamOperatorTestHarness2.restore(snapshotLegacy);
            oneInputStreamOperatorTestHarness2.open();
            for (int i3 = 700; i3 < 1000; i3++) {
                oneInputStreamOperatorTestHarness2.processElement(new StreamRecord(Integer.valueOf(i3)));
            }
            oneInputStreamOperatorTestHarness2.setProcessingTime(50L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(100L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(150L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(200L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(250L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(300L);
            oneInputStreamOperatorTestHarness2.setProcessingTime(350L);
            ArrayList arrayList = new ArrayList(extractFromStreamRecords);
            arrayList.addAll(extractFromStreamRecords(oneInputStreamOperatorTestHarness2.getOutput()));
            Assert.assertEquals(4000L, arrayList.size());
            Collections.sort(arrayList);
            for (int i4 = 0; i4 < 4000; i4++) {
                Assert.assertEquals(i4 / 4, ((Integer) arrayList.get(i4)).intValue());
            }
            oneInputStreamOperatorTestHarness2.close();
            accumulatingProcessingTimeWindowOperator.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testKeyValueStateInWindowFunction() {
        try {
            StatefulFunction.globalCounts.clear();
            AccumulatingProcessingTimeWindowOperator accumulatingProcessingTimeWindowOperator = new AccumulatingProcessingTimeWindowOperator(new StatefulFunction(), this.identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50L, 50L);
            KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(accumulatingProcessingTimeWindowOperator, this.identitySelector, BasicTypeInfo.INT_TYPE_INFO);
            keyedOneInputStreamOperatorTestHarness.open();
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1));
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(1));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
            accumulatingProcessingTimeWindowOperator.processElement(new StreamRecord(2));
            keyedOneInputStreamOperatorTestHarness.setProcessingTime(1000L);
            List extractFromStreamRecords = extractFromStreamRecords(keyedOneInputStreamOperatorTestHarness.getOutput());
            Assert.assertEquals(8L, extractFromStreamRecords.size());
            Collections.sort(extractFromStreamRecords);
            Assert.assertEquals(Arrays.asList(1, 1, 1, 1, 2, 2, 2, 2), extractFromStreamRecords);
            Assert.assertEquals(4L, StatefulFunction.globalCounts.get(1).intValue());
            Assert.assertEquals(4L, StatefulFunction.globalCounts.get(2).intValue());
            keyedOneInputStreamOperatorTestHarness.close();
            accumulatingProcessingTimeWindowOperator.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private void assertInvalidParameter(long j, long j2) {
        try {
            new AccumulatingProcessingTimeWindowOperator(this.mockFunction, this.mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, j, j2);
            Assert.fail("This should fail with an IllegalArgumentException");
        } catch (IllegalArgumentException e) {
        } catch (Exception e2) {
            Assert.fail("Wrong exception. Expected IllegalArgumentException but found " + e2.getClass().getSimpleName());
        }
    }

    private static StreamTask<?, ?> createMockTask() {
        Configuration configuration = new Configuration();
        configuration.setString("state.backend", "jobmanager");
        StreamTask<?, ?> streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.getAccumulatorMap()).thenReturn(new HashMap());
        Mockito.when(streamTask.getName()).thenReturn("Test task name");
        Mockito.when(streamTask.getExecutionConfig()).thenReturn(new ExecutionConfig());
        Mockito.when(((TaskManagerRuntimeInfo) Mockito.mock(TaskManagerRuntimeInfo.class)).getConfiguration()).thenReturn(configuration);
        Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(environment.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0));
        Mockito.when(environment.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
        Mockito.when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
        Mockito.when(environment.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
        Mockito.when(streamTask.getEnvironment()).thenReturn(environment);
        return streamTask;
    }

    private static StreamTask<?, ?> createMockTaskWithTimer(ProcessingTimeService processingTimeService) {
        StreamTask<?, ?> createMockTask = createMockTask();
        Mockito.when(createMockTask.getProcessingTimeService()).thenReturn(processingTimeService);
        return createMockTask;
    }

    private <T> List<T> extractFromStreamRecords(Iterable<?> iterable) {
        ArrayList arrayList = new ArrayList();
        for (Object obj : iterable) {
            if (obj instanceof StreamRecord) {
                arrayList.add(((StreamRecord) obj).getValue());
            }
        }
        return arrayList;
    }

    private static void shutdownTimerServiceAndWait(ProcessingTimeService processingTimeService) throws Exception {
        processingTimeService.shutdownService();
        while (!processingTimeService.isTerminated()) {
            Thread.sleep(2L);
        }
    }
}
