Re: Flink CEP AbstractCEPPatternOperator fail after event detection

Posted by Till Rohrmann on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948p5951.html

Hi Norman,

which version of Flink are you using? We recently fixed some issues with the CEP library which looked similar to your error message. The problem occurred when using the CEP library with processing time. Switching to event or ingestion time, solve the problem.

The fixes to make it also work with processing time are included in the latest snapshot version 1.1-SNAPSHOT and will be part of the upcoming 1.0.1 bugfix release. The bugfix release will actually be released today.

If the problem should still remain with the latest version, it would be good to see your complete Flink program.

Cheers,
Till

On Wed, Apr 6, 2016 at 11:04 AM, norman sp <[hidden email]> wrote:
Hi,
I'm trying out the new CEP library but have some problems with event
detection.
In my case Flink detects the event pattern: A followed by B within 10
seconds.
But short time after event detection when the event pattern isn't matched
anymore, the program crashes with the error message:

04/06/2016 11:04:47     Job execution switched to status FAILING.
java.lang.NullPointerException
        at
org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
        at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
        at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next
AbstractCEPPatternOperator failes.

That's my code:
Pattern<Tuple5&lt;String, String, Double, Double, Double>, ?> FlowPattern =
Pattern.<Tuple5&lt;String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5&lt;String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new
FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements
PatternSelectFunction<Tuple5&lt;String, String, Double, Double, Double>,
String> {
                @Override
                public String select(Map<String, Tuple5&lt;String, String, Double, Double,
Double>> pat) throws Exception {
                      Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
                      Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");

                        return "  #######   Warning! FlowPattern   ####### " +
pressure.toString() + " - " + flow.toString();
                }
        }


How can I solve that?
Hope somebody could help me.

greetz Norman




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.