Flink CEP AbstractCEPPatternOperator fail after event detection

Posted by norman sp on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html

Hi,
I'm trying out the new CEP library but have some problems with event detection.
In my case Flink detects the event pattern: A followed by B within 10 seconds.
But short time after event detection when the event pattern isn't matched anymore, the program crashes with the error message:


04/06/2016 11:04:47 Job execution switched to status FAILING.
java.lang.NullPointerException
        at org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
        at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
        at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next AbstractCEPPatternOperator failes.

That's my code:

Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern = Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5<String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements PatternSelectFunction<Tuple5<String, String, Double, Double, Double>, String> {
                @Override
                public String select(Map<String, Tuple5<String, String, Double, Double, Double>> pat) throws Exception {
                      Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10");
                      Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10");
               
                        return "  #######   Warning! FlowPattern   ####### " + pressure.toString() + " - " + flow.toString();
                }
        }


How can I solve that?
Hope somebody could help me.

greetz Norman