/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BatchExecutionInternalTimeServiceTest
extends TestLogger {
    public static final IntSerializer KEY_SERIALIZER = new IntSerializer();
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testBatchExecutionManagerCanBeInstantiatedWithBatchStateBackend() throws Exception {
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch execution specific time service can work only with BatchExecutionKeyedStateBackend");
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        AbstractKeyedStateBackend stateBackend = new MemoryStateBackend().createKeyedStateBackend((Environment)mockEnvironment, new JobID(), "dummy", (TypeSerializer)KEY_SERIALIZER, 2, new KeyGroupRange(0, 1), mockEnvironment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)stateBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)new TestProcessingTimeService(), Collections.emptyList());
    }

    @Test
    public void testForEachEventTimeTimerUnsupported() {
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("The BatchExecutionInternalTimeService should not be used in State Processor API");
        BatchExecutionInternalTimeService timeService = new BatchExecutionInternalTimeService((ProcessingTimeService)new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(timer -> {}));
        timeService.forEachEventTimeTimer((o, aLong) -> Assert.fail((String)"The forEachEventTimeTimer() should not be supported"));
    }

    @Test
    public void testForEachProcessingTimeTimerUnsupported() {
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("The BatchExecutionInternalTimeService should not be used in State Processor API");
        BatchExecutionInternalTimeService timeService = new BatchExecutionInternalTimeService((ProcessingTimeService)new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(timer -> {}));
        timeService.forEachEventTimeTimer((o, aLong) -> Assert.fail((String)"The forEachProcessingTimeTimer() should not be supported"));
    }

    @Test
    public void testFiringEventTimeTimers() throws Exception {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)new TestProcessingTimeService(), Collections.emptyList());
        ArrayList timers = new ArrayList();
        InternalTimerService timerService = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        timeServiceManager.advanceWatermark(new Watermark(1000L));
        timerService.deleteEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        keyedStatedBackend.setCurrentKey((Object)2);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testSettingSameKeyDoesNotFireTimers() {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)new TestProcessingTimeService(), Collections.emptyList());
        ArrayList timers = new ArrayList();
        InternalTimerService timerService = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        keyedStatedBackend.setCurrentKey((Object)1);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
    }

    @Test
    public void testCurrentWatermark() throws Exception {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)new TestProcessingTimeService(), Collections.emptyList());
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((timer, timerService) -> {
            Assert.assertThat((Object)timerService.currentWatermark(), (Matcher)CoreMatchers.equalTo((Object)Long.MAX_VALUE));
            timers.add(timer.getTimestamp());
        });
        InternalTimerService timerService2 = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), eventTimeTrigger);
        eventTimeTrigger.setTimerService(timerService2);
        Assert.assertThat((Object)timerService2.currentWatermark(), (Matcher)CoreMatchers.equalTo((Object)Long.MIN_VALUE));
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService2.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        Assert.assertThat((Object)timerService2.currentWatermark(), (Matcher)CoreMatchers.equalTo((Object)Long.MIN_VALUE));
        timeServiceManager.advanceWatermark(new Watermark(1000L));
        Assert.assertThat((Object)timerService2.currentWatermark(), (Matcher)CoreMatchers.equalTo((Object)Long.MIN_VALUE));
        keyedStatedBackend.setCurrentKey((Object)2);
        Assert.assertThat((Object)timerService2.currentWatermark(), (Matcher)CoreMatchers.equalTo((Object)Long.MIN_VALUE));
        timerService2.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 124L);
        timeServiceManager.advanceWatermark(Watermark.MAX_WATERMARK);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Arrays.asList(123L, 124L)));
    }

    @Test
    public void testProcessingTimeTimers() {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)processingTimeService, Collections.emptyList());
        ArrayList timers = new ArrayList();
        InternalTimerService timerService = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), LambdaTrigger.processingTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assert.assertThat((Object)processingTimeService.getNumActiveTimers(), (Matcher)CoreMatchers.equalTo((Object)0));
        keyedStatedBackend.setCurrentKey((Object)2);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testIgnoringEventTimeTimersFromWithinCallback() {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)processingTimeService, Collections.emptyList());
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess trigger = TriggerWithTimerServiceAccess.eventTimeTrigger((timer, ts) -> {
            timers.add(timer.getTimestamp());
            ts.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, timer.getTimestamp() + 20L);
        });
        InternalTimerService timerService = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), trigger);
        trigger.setTimerService(timerService);
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assert.assertThat((Object)processingTimeService.getNumActiveTimers(), (Matcher)CoreMatchers.equalTo((Object)0));
        keyedStatedBackend.setCurrentKey((Object)2);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    @Test
    public void testIgnoringProcessingTimeTimersFromWithinCallback() {
        BatchExecutionKeyedStateBackend keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimeServiceManager timeServiceManager = BatchExecutionInternalTimeServiceManager.create((CheckpointableKeyedStateBackend)keyedStatedBackend, (ClassLoader)((Object)((Object)this)).getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)processingTimeService, Collections.emptyList());
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess trigger = TriggerWithTimerServiceAccess.processingTimeTrigger((timer, ts) -> {
            timers.add(timer.getTimestamp());
            ts.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, timer.getTimestamp() + 20L);
        });
        InternalTimerService timerService = timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), trigger);
        trigger.setTimerService(timerService);
        keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assert.assertThat((Object)processingTimeService.getNumActiveTimers(), (Matcher)CoreMatchers.equalTo((Object)0));
        keyedStatedBackend.setCurrentKey((Object)2);
        Assert.assertThat(timers, (Matcher)CoreMatchers.equalTo(Collections.singletonList(150L)));
    }

    private static class DummyKeyContext
    implements KeyContext {
        private DummyKeyContext() {
        }

        public void setCurrentKey(Object key) {
        }

        public Object getCurrentKey() {
            return null;
        }
    }

    private static class LambdaTrigger<K, N>
    implements Triggerable<K, N> {
        private final Consumer<InternalTimer<K, N>> eventTimeHandler;
        private final Consumer<InternalTimer<K, N>> processingTimeHandler;

        public static <K, N> LambdaTrigger<K, N> eventTimeTrigger(Consumer<InternalTimer<K, N>> eventTimeHandler) {
            return new LambdaTrigger<K, N>(eventTimeHandler, timer -> Assert.fail((String)"We did not expect processing timer to be triggered."));
        }

        public static <K, N> LambdaTrigger<K, N> processingTimeTrigger(Consumer<InternalTimer<K, N>> processingTimeHandler) {
            return new LambdaTrigger<K, N>(timer -> Assert.fail((String)"We did not expect event timer to be triggered."), processingTimeHandler);
        }

        private LambdaTrigger(Consumer<InternalTimer<K, N>> eventTimeHandler, Consumer<InternalTimer<K, N>> processingTimeHandler) {
            this.eventTimeHandler = eventTimeHandler;
            this.processingTimeHandler = processingTimeHandler;
        }

        public void onEventTime(InternalTimer<K, N> timer) throws Exception {
            this.eventTimeHandler.accept(timer);
        }

        public void onProcessingTime(InternalTimer<K, N> timer) throws Exception {
            this.processingTimeHandler.accept(timer);
        }
    }

    private static class TriggerWithTimerServiceAccess<K, N>
    implements Triggerable<K, N> {
        private InternalTimerService<N> timerService;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler;

        private TriggerWithTimerServiceAccess(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler, BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler) {
            this.eventTimeHandler = eventTimeHandler;
            this.processingTimeHandler = processingTimeHandler;
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> eventTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler) {
            return new TriggerWithTimerServiceAccess<K, N>(eventTimeHandler, (timer, timeService) -> Assert.fail((String)"We did not expect processing timer to be triggered."));
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> processingTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler) {
            return new TriggerWithTimerServiceAccess<K, N>((timer, timeService) -> Assert.fail((String)"We did not expect event timer to be triggered."), processingTimeHandler);
        }

        public void setTimerService(InternalTimerService<N> timerService) {
            this.timerService = timerService;
        }

        public void onEventTime(InternalTimer<K, N> timer) throws Exception {
            this.eventTimeHandler.accept(timer, this.timerService);
        }

        public void onProcessingTime(InternalTimer<K, N> timer) throws Exception {
            this.processingTimeHandler.accept(timer, this.timerService);
        }
    }
}

