package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest.class */
public class ProcessOperatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$BothTriggeringFlatMapFunction.class */
    private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        private BothTriggeringFlatMapFunction() {
        }

        public void processElement(Integer num, ProcessFunction.Context context, Collector<String> collector) throws Exception {
            context.timerService().registerProcessingTimeTimer(5L);
            context.timerService().registerEventTimeTimer(6L);
        }

        public void onTimer(long j, ProcessFunction.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            if (TimeDomain.EVENT_TIME.equals(onTimerContext.timeDomain())) {
                collector.collect("EVENT:1777");
            } else {
                collector.collect("PROC:1777");
            }
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$IdentityKeySelector.class */
    private static class IdentityKeySelector<T> implements KeySelector<T, T> {
        private static final long serialVersionUID = 1;

        private IdentityKeySelector() {
        }

        public T getKey(T t) throws Exception {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$QueryingFlatMapFunction.class */
    private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1;
        private final TimeDomain timeDomain;

        public QueryingFlatMapFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer num, ProcessFunction.Context context, Collector<String> collector) throws Exception {
            if (this.timeDomain.equals(TimeDomain.EVENT_TIME)) {
                collector.collect(num + "TIME:" + context.timerService().currentWatermark() + " TS:" + context.timestamp());
            } else {
                collector.collect(num + "TIME:" + context.timerService().currentProcessingTime() + " TS:" + context.timestamp());
            }
        }

        public void onTimer(long j, ProcessFunction.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$TriggeringFlatMapFunction.class */
    private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> {
        private static final long serialVersionUID = 1;
        private final TimeDomain timeDomain;

        public TriggeringFlatMapFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer num, ProcessFunction.Context context, Collector<Integer> collector) throws Exception {
            collector.collect(num);
            if (this.timeDomain.equals(TimeDomain.EVENT_TIME)) {
                context.timerService().registerEventTimeTimer(context.timerService().currentWatermark() + 5);
            } else {
                context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5);
            }
        }

        public void onTimer(long j, ProcessFunction.OnTimerContext onTimerContext, Collector<Integer> collector) throws Exception {
            Assert.assertEquals(this.timeDomain, onTimerContext.timeDomain());
            collector.collect(1777);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, context, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/ProcessOperatorTest$TriggeringStatefulFlatMapFunction.class */
    private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> {
        private static final long serialVersionUID = 1;
        private final ValueStateDescriptor<Integer> state = new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
        private final TimeDomain timeDomain;

        public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
            this.timeDomain = timeDomain;
        }

        public void processElement(Integer num, ProcessFunction.Context context, Collector<String> collector) throws Exception {
            collector.collect("INPUT:" + num);
            getRuntimeContext().getState(this.state).update(num);
            if (this.timeDomain.equals(TimeDomain.EVENT_TIME)) {
                context.timerService().registerEventTimeTimer(context.timerService().currentWatermark() + 5);
            } else {
                context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + 5);
            }
        }

        public void onTimer(long j, ProcessFunction.OnTimerContext onTimerContext, Collector<String> collector) throws Exception {
            Assert.assertEquals(this.timeDomain, onTimerContext.timeDomain());
            collector.collect("STATE:" + getRuntimeContext().getState(this.state).value());
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, context, (Collector<String>) collector);
        }
    }

    @Test
    public void testTimestampAndWatermarkQuerying() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(17L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(42L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 13L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(17L));
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:12", 12L));
        concurrentLinkedQueue.add(new Watermark(42L));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimestampAndProcessingTimeQuerying() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(17L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(42L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(6));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("5TIME:17 TS:null"));
        concurrentLinkedQueue.add(new StreamRecord("6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimers() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17, 42L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(5L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(0L));
        concurrentLinkedQueue.add(new StreamRecord(17, 42L));
        concurrentLinkedQueue.add(new StreamRecord(1777, 5L));
        concurrentLinkedQueue.add(new Watermark(5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimers() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(5L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord(17));
        concurrentLinkedQueue.add(new StreamRecord(1777, 5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testEventTimeTimerWithState() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(1L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17, 0L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(2L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(42, 1L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(6L));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(7L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new Watermark(1L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:17", 0L));
        concurrentLinkedQueue.add(new Watermark(2L));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:42", 1L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42", 7L));
        concurrentLinkedQueue.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testProcessingTimeTimerWithState() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(1L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(17));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(2L);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(42));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(6L);
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("INPUT:17"));
        concurrentLinkedQueue.add(new StreamRecord("INPUT:42"));
        concurrentLinkedQueue.add(new StreamRecord("STATE:17", 6L));
        concurrentLinkedQueue.add(new StreamRecord("STATE:42", 7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
        keyedOneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new BothTriggeringFlatMapFunction()), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.setup();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 12L));
        OperatorStateHandles snapshot = keyedOneInputStreamOperatorTestHarness.snapshot(0L, 0L);
        keyedOneInputStreamOperatorTestHarness.close();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness(new ProcessOperator(new BothTriggeringFlatMapFunction()), new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness2.setup();
        keyedOneInputStreamOperatorTestHarness2.initializeState(snapshot);
        keyedOneInputStreamOperatorTestHarness2.open();
        keyedOneInputStreamOperatorTestHarness2.setProcessingTime(5L);
        keyedOneInputStreamOperatorTestHarness2.processWatermark(new Watermark(6L));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(new StreamRecord("PROC:1777", 5L));
        concurrentLinkedQueue.add(new StreamRecord("EVENT:1777", 6L));
        concurrentLinkedQueue.add(new Watermark(6L));
        System.out.println("GOT: " + keyedOneInputStreamOperatorTestHarness2.getOutput());
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness2.getOutput());
        keyedOneInputStreamOperatorTestHarness2.close();
    }
}
