package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.class */
public class StreamTaskTimerTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest$DummyMapFunction.class */
    public static class DummyMapFunction<T> implements MapFunction<T, T> {
        public T map(T t) {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest$ValidatingProcessingTimeCallback.class */
    private static class ValidatingProcessingTimeCallback implements ProcessingTimeCallback {
        static int numInSequence;
        private final AtomicReference<Throwable> errorRef;
        private final long expectedTimestamp;
        private final int expectedInSequence;

        private ValidatingProcessingTimeCallback(AtomicReference<Throwable> atomicReference, long j, int i) {
            this.errorRef = atomicReference;
            this.expectedTimestamp = j;
            this.expectedInSequence = i;
        }

        public void onProcessingTime(long j) {
            try {
                Assert.assertEquals(this.expectedTimestamp, j);
                Assert.assertEquals(this.expectedInSequence, numInSequence);
                numInSequence++;
            } catch (Throwable th) {
                this.errorRef.compareAndSet(null, th);
            }
        }
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTask oneInputStreamTask = new OneInputStreamTask();
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(oneInputStreamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new StreamMap(new DummyMapFunction()));
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.operators.StreamTaskTimerTest.1
            public void onProcessingTime(long j) {
            }
        });
        Assert.assertEquals(1L, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        long currentTimeMillis = System.currentTimeMillis() + 4000;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(10L);
        }
        Assert.assertEquals("Trigger timer thread did not properly shut down", 0L, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    @Test
    public void checkScheduledTimestampe() {
        try {
            OneInputStreamTask oneInputStreamTask = new OneInputStreamTask();
            OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(oneInputStreamTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
            oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new StreamMap(new DummyMapFunction()));
            oneInputStreamTaskTestHarness.invoke();
            oneInputStreamTaskTestHarness.waitForTaskRunning();
            AtomicReference atomicReference = new AtomicReference();
            long currentTimeMillis = System.currentTimeMillis();
            long currentTimeMillis2 = System.currentTimeMillis() - 200;
            long currentTimeMillis3 = System.currentTimeMillis() + 100;
            long currentTimeMillis4 = System.currentTimeMillis() + 200;
            ProcessingTimeService processingTimeService = oneInputStreamTask.getProcessingTimeService();
            processingTimeService.registerTimer(currentTimeMillis, new ValidatingProcessingTimeCallback(atomicReference, currentTimeMillis, 0));
            processingTimeService.registerTimer(currentTimeMillis2, new ValidatingProcessingTimeCallback(atomicReference, currentTimeMillis2, 1));
            processingTimeService.registerTimer(currentTimeMillis3, new ValidatingProcessingTimeCallback(atomicReference, currentTimeMillis3, 2));
            processingTimeService.registerTimer(currentTimeMillis4, new ValidatingProcessingTimeCallback(atomicReference, currentTimeMillis4, 3));
            long currentTimeMillis5 = System.currentTimeMillis() + 20000;
            while (atomicReference.get() == null && ValidatingProcessingTimeCallback.numInSequence < 4 && System.currentTimeMillis() < currentTimeMillis5) {
                Thread.sleep(100L);
            }
            if (atomicReference.get() != null) {
                ((Throwable) atomicReference.get()).printStackTrace();
                Assert.fail(((Throwable) atomicReference.get()).getMessage());
            }
            Assert.assertEquals(4L, ValidatingProcessingTimeCallback.numInSequence);
            oneInputStreamTaskTestHarness.endInput();
            oneInputStreamTaskTestHarness.waitForTaskCompletion();
            long currentTimeMillis6 = System.currentTimeMillis() + 4000;
            while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < currentTimeMillis6) {
                Thread.sleep(10L);
            }
            Assert.assertEquals("Trigger timer thread did not properly shut down", 0L, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
