Hi,
I'm trying out the new CEP library but have some problems with event
detection.
In my case Flink detects the event pattern: A followed by B within 10
seconds.
But short time after event detection when the event pattern isn't matched
anymore, the program crashes with the error message:
04/06/2016 11:04:47 Job execution switched to status FAILING.
java.lang.NullPointerException
at
org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
After that, the job execution is restarted and proceeds well until the next
AbstractCEPPatternOperator failes.
That's my code:
Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern =
Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>()
{//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>()
{//some Filter}})
.within(Time.seconds(10));
PatternStream<Tuple5<String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new
FlowPatternWarning());
warning.print();
private static class FlowPatternWarning implements
PatternSelectFunction<Tuple5<String, String, Double, Double, Double>,
String> {
@Override
public String select(Map<String, Tuple5<String, String, Double, Double,
Double>> pat) throws Exception {
Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
return " ####### Warning! FlowPattern ####### " +
pressure.toString() + " - " + flow.toString();
}
}
How can I solve that?
Hope somebody could help me.
greetz Norman
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Free forum by Nabble | Edit this page |