Flink CEP AbstractCEPPatternOperator fail after event detection
Posted by norman sp on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Hi,
I'm trying out the new CEP library but have some problems with event detection.
In my case Flink detects the event pattern: A followed by B within 10 seconds.
But short time after event detection when the event pattern isn't matched anymore, the program crashes with the error message:
04/06/2016 11:04:47 Job execution switched to status FAILING.
java.lang.NullPointerException
at org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
at org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
After that, the job execution is restarted and proceeds well until the next AbstractCEPPatternOperator failes.
That's my code:
Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern = Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}})
.within(Time.seconds(10));
PatternStream<Tuple5<String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new FlowPatternWarning());
warning.print();
private static class FlowPatternWarning implements PatternSelectFunction<Tuple5<String, String, Double, Double, Double>, String> {
@Override
public String select(Map<String, Tuple5<String, String, Double, Double, Double>> pat) throws Exception {
Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10");
Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10");
return " ####### Warning! FlowPattern ####### " + pressure.toString() + " - " + flow.toString();
}
}
How can I solve that?
Hope somebody could help me.
greetz Norman