/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.test.util.MigrationTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CEPMigrationTest
implements MigrationTest {
    private final FlinkVersion migrateVersion;

    @Parameterized.Parameters(name="Migration Savepoint: {0}")
    public static Collection<FlinkVersion> parameters() {
        return FlinkVersion.rangeOf((FlinkVersion)FlinkVersion.v1_8, (FlinkVersion)MigrationTest.getMostRecentlyPublishedVersion());
    }

    public CEPMigrationTest(FlinkVersion migrateVersion) {
        this.migrateVersion = migrateVersion;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MigrationTest.SnapshotsGenerator
    public void writeAfterBranchingPatternSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent = new Event(42, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 2L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 3L));
            harness.processWatermark(new Watermark(5L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/cep-migration-after-branching-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoreAfterBranchingPattern() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent = new Event(42, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
        Event endEvent = new Event(42, "end", 1.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("cep-migration-after-branching-flink" + this.migrateVersion + "-snapshot")));
            harness.open();
            harness.processElement(new StreamRecord((Object)new Event(42, "start", 1.0), 4L));
            harness.processElement(new StreamRecord((Object)endEvent, 5L));
            harness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)3L, (long)result.size());
            Object resultObject1 = result.poll();
            Assert.assertTrue((boolean)(resultObject1 instanceof StreamRecord));
            StreamRecord resultRecord1 = (StreamRecord)resultObject1;
            Assert.assertTrue((boolean)(resultRecord1.getValue() instanceof Map));
            Object resultObject2 = result.poll();
            Assert.assertTrue((boolean)(resultObject2 instanceof StreamRecord));
            StreamRecord resultRecord2 = (StreamRecord)resultObject2;
            Assert.assertTrue((boolean)(resultRecord2.getValue() instanceof Map));
            Map patternMap1 = (Map)resultRecord1.getValue();
            Assert.assertEquals((Object)startEvent, ((List)patternMap1.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent1, ((List)patternMap1.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap1.get("end")).get(0));
            Map patternMap2 = (Map)resultRecord2.getValue();
            Assert.assertEquals((Object)startEvent, ((List)patternMap2.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent2, ((List)patternMap2.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap2.get("end")).get(0));
            Event startEvent1 = new Event(42, "start", 2.0);
            SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
            Event endEvent1 = new Event(42, "end", 2.0);
            harness.processElement(new StreamRecord((Object)startEvent1, 21L));
            harness.processElement(new StreamRecord((Object)middleEvent3, 23L));
            OperatorSubtaskState snapshot = harness.snapshot(1L, 1L);
            harness.close();
            harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)endEvent1, 25L));
            harness.processWatermark(new Watermark(50L));
            result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            Object resultObject3 = result.poll();
            Assert.assertTrue((boolean)(resultObject3 instanceof StreamRecord));
            StreamRecord resultRecord3 = (StreamRecord)resultObject3;
            Assert.assertTrue((boolean)(resultRecord3.getValue() instanceof Map));
            Map patternMap3 = (Map)resultRecord3.getValue();
            Assert.assertEquals((Object)startEvent1, ((List)patternMap3.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent3, ((List)patternMap3.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent1, ((List)patternMap3.get("end")).get(0));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MigrationTest.SnapshotsGenerator
    public void writeStartingNewPatternAfterMigrationSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(42, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)new Event(42, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)new SubEvent(42, "barfoo", 1.0, 5.0), 3L));
            harness.processElement(new StreamRecord((Object)middleEvent1, 2L));
            harness.processWatermark(new Watermark(5L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/cep-migration-starting-new-pattern-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestoreStartingNewPatternAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(42, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
        Event startEvent2 = new Event(42, "start", 5.0);
        SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
        Event endEvent = new Event(42, "end", 1.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("cep-migration-starting-new-pattern-flink" + this.migrateVersion + "-snapshot")));
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent2, 5L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 6L));
            harness.processElement(new StreamRecord((Object)endEvent, 7L));
            harness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)4L, (long)result.size());
            Object resultObject1 = result.poll();
            Assert.assertTrue((boolean)(resultObject1 instanceof StreamRecord));
            StreamRecord resultRecord1 = (StreamRecord)resultObject1;
            Assert.assertTrue((boolean)(resultRecord1.getValue() instanceof Map));
            Object resultObject2 = result.poll();
            Assert.assertTrue((boolean)(resultObject2 instanceof StreamRecord));
            StreamRecord resultRecord2 = (StreamRecord)resultObject2;
            Assert.assertTrue((boolean)(resultRecord2.getValue() instanceof Map));
            Object resultObject3 = result.poll();
            Assert.assertTrue((boolean)(resultObject3 instanceof StreamRecord));
            StreamRecord resultRecord3 = (StreamRecord)resultObject3;
            Assert.assertTrue((boolean)(resultRecord3.getValue() instanceof Map));
            Map patternMap1 = (Map)resultRecord1.getValue();
            Assert.assertEquals((Object)startEvent1, ((List)patternMap1.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent1, ((List)patternMap1.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap1.get("end")).get(0));
            Map patternMap2 = (Map)resultRecord2.getValue();
            Assert.assertEquals((Object)startEvent1, ((List)patternMap2.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent2, ((List)patternMap2.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap2.get("end")).get(0));
            Map patternMap3 = (Map)resultRecord3.getValue();
            Assert.assertEquals((Object)startEvent2, ((List)patternMap3.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent2, ((List)patternMap3.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap3.get("end")).get(0));
            Event startEvent3 = new Event(42, "start", 2.0);
            SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0);
            Event endEvent1 = new Event(42, "end", 2.0);
            harness.processElement(new StreamRecord((Object)startEvent3, 21L));
            harness.processElement(new StreamRecord((Object)middleEvent3, 23L));
            OperatorSubtaskState snapshot = harness.snapshot(1L, 1L);
            harness.close();
            harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
            harness.setup();
            harness.initializeState(snapshot);
            harness.open();
            harness.processElement(new StreamRecord((Object)endEvent1, 25L));
            harness.processWatermark(new Watermark(50L));
            result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            Object resultObject4 = result.poll();
            Assert.assertTrue((boolean)(resultObject4 instanceof StreamRecord));
            StreamRecord resultRecord4 = (StreamRecord)resultObject4;
            Assert.assertTrue((boolean)(resultRecord4.getValue() instanceof Map));
            Map patternMap4 = (Map)resultRecord4.getValue();
            Assert.assertEquals((Object)startEvent3, ((List)patternMap4.get("start")).get(0));
            Assert.assertEquals((Object)middleEvent3, ((List)patternMap4.get("middle")).get(0));
            Assert.assertEquals((Object)endEvent1, ((List)patternMap4.get("end")).get(0));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MigrationTest.SnapshotsGenerator
    public void writeSinglePatternAfterMigrationSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(42, "start", 1.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new SinglePatternNFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.open();
            harness.processWatermark(new Watermark(5L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/cep-migration-single-pattern-afterwards-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSinglePatternAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(42, "start", 1.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new SinglePatternNFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("cep-migration-single-pattern-afterwards-flink" + this.migrateVersion + "-snapshot")));
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent1, 5L));
            harness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            Object resultObject = result.poll();
            Assert.assertTrue((boolean)(resultObject instanceof StreamRecord));
            StreamRecord resultRecord = (StreamRecord)resultObject;
            Assert.assertTrue((boolean)(resultRecord.getValue() instanceof Map));
            Map patternMap = (Map)resultRecord.getValue();
            Assert.assertEquals((Object)startEvent1, ((List)patternMap.get("start")).get(0));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @MigrationTest.SnapshotsGenerator
    public void writeAndOrSubtypConditionsPatternAfterMigrationSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        SubEvent startEvent1 = new SubEvent(42, "start", 1.0, 6.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAComplexConditionsFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent1, 5L));
            harness.processWatermark(new Watermark(6L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            OperatorSnapshotUtil.writeStateHandle((OperatorSubtaskState)snapshot, (String)("src/test/resources/cep-migration-conditions-flink" + flinkGenerateSavepointVersion + "-snapshot"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAndOrSubtypeConditionsAfterMigration() throws Exception {
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        SubEvent startEvent1 = new SubEvent(42, "start", 1.0, 6.0);
        try (KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAComplexConditionsFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            harness.setup();
            harness.initializeState(OperatorSnapshotUtil.getResourceFilename((String)("cep-migration-conditions-flink" + this.migrateVersion + "-snapshot")));
            harness.open();
            SubEvent endEvent = new SubEvent(42, "end", 1.0, 2.0);
            harness.processElement(new StreamRecord((Object)endEvent, 9L));
            harness.processWatermark(new Watermark(20L));
            ConcurrentLinkedQueue result = harness.getOutput();
            Assert.assertEquals((long)2L, (long)result.size());
            Object resultObject = result.poll();
            Assert.assertTrue((boolean)(resultObject instanceof StreamRecord));
            StreamRecord resultRecord = (StreamRecord)resultObject;
            Assert.assertTrue((boolean)(resultRecord.getValue() instanceof Map));
            Map patternMap = (Map)resultRecord.getValue();
            Assert.assertEquals((Object)startEvent1, ((List)patternMap.get("start")).get(0));
            Assert.assertEquals((Object)endEvent, ((List)patternMap.get("start")).get(1));
        }
    }

    private static class SubEventEndFilter
    extends SimpleCondition<SubEvent> {
        private static final long serialVersionUID = 7056763917392056548L;

        private SubEventEndFilter() {
        }

        public boolean filter(SubEvent value) throws Exception {
            return value.getName().equals("end");
        }
    }

    private static class EndFilter
    extends SimpleCondition<Event> {
        private static final long serialVersionUID = 7056763917392056548L;

        private EndFilter() {
        }

        public boolean filter(Event value) throws Exception {
            return value.getName().equals("end");
        }
    }

    private static class MiddleFilter
    extends SimpleCondition<SubEvent> {
        private static final long serialVersionUID = 6215754202506583964L;

        private MiddleFilter() {
        }

        public boolean filter(SubEvent value) throws Exception {
            return value.getVolume() > 5.0;
        }
    }

    private static class StartFilter
    extends SimpleCondition<Event> {
        private static final long serialVersionUID = 5726188262756267490L;

        private StartFilter() {
        }

        public boolean filter(Event value) throws Exception {
            return value.getName().equals("start");
        }
    }

    private static class NFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)new StartFilter()).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)new MiddleFilter()).followedByAny("end").where((IterativeCondition)new EndFilter()).within(Time.milliseconds((long)10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }

    private static class NFAComplexConditionsFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAComplexConditionsFactory() {
            this(false);
        }

        private NFAComplexConditionsFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").subtype(SubEvent.class).where((IterativeCondition)new MiddleFilter()).or((IterativeCondition)new SubEventEndFilter()).times(2).within(Time.milliseconds((long)10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }

    private static class SinglePatternNFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private SinglePatternNFAFactory() {
            this(false);
        }

        private SinglePatternNFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)new StartFilter()).within(Time.milliseconds((long)10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }
}

