Hi guys,
We tried building a simple pattern with the CEP library that matches 2 events with 2 filter conditions (where) but we get a strange error that comes from the stream operator: Pattern<Either<View, Click>, ?> viewAndClick = Pattern .<Either<View,, Click>> begin("view") .where(Either::isLeft) .followedBy("click").where(Either::isRight) .within(Time.hours(8)); CEP.pattern(stream, pattern).select(...); We get the following exception when running this: java.lang.RuntimeException: Failure happened in filter function. at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:318) at org.apache.flink.cep.nfa.NFA.process(NFA.java:162) at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:48) at org.apache.flink.cep.operator.AbstractCEPBasePatternOperator.processElement(AbstractCEPBasePatternOperator.java:72) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:161) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Could not find previous shared buffer entry with key: State(view, Normal, [ StateTransition(TAKE, click, with filter), StateTransition(IGNORE, view), ]), value: Left(View[...]) and timestamp: 1473258371116. This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there. at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:295) ... 9 more Any ideas on what's going on here? Thanks, Gyula |
Hi Gyula, could you send us en example input which reproduces the problem? The underlying problem is that the system expects a state to be still stored in the `SharedBuffer` which has already been removed. This should actually not happen and it clearly indicates a bug. Cheers, Till On Wed, Sep 7, 2016 at 4:43 PM, Gyula Fóra <[hidden email]> wrote:
|
Hi, I will try to get some minimal input to reproduce this. We were reading events from Kafka so I might need some time. Thanks Till for looking into this Gyula Till Rohrmann <[hidden email]> ezt írta (időpont: 2016. szept. 7., Sze, 17:34):
|
Interestingly on my local machine I could not reproduce the problem, maybe it was some build issue on the other machine. Have to investigate tomorrow :) Gyula Gyula Fóra <[hidden email]> ezt írta (időpont: 2016. szept. 7., Sze, 17:37):
|
Hi, So we cant seem to be able to reproduce the error after clearing the local maven cache. It works now :) Gyula Gyula Fóra <[hidden email]> ezt írta (időpont: 2016. szept. 7., Sze, 22:05):
|
Great to hear :-) I was already afraid that I've overlooked another window boundary condition which I got wrong. If you should encounter other problems, then let me know. Cheers, Till On Fri, Sep 9, 2016 at 9:59 AM, Gyula Fóra <[hidden email]> wrote:
|
This post was updated on .
Hello guys,
I've just encountered the same problem with the error: java.lang.RuntimeException: Failure happened in filter function. at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:318) at org.apache.flink.cep.nfa.NFA.process(NFA.java:162) at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:48) at org.apache.flink.cep.operator.AbstractCEPBasePatternOperator.processElement(AbstractCEPBasePatternOperator.java:72) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:161) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Could not find previous shared buffer entry with key: State(LR Display, Normal, [ StateTransition(TAKE, Add To Basket, with filter), StateTransition(IGNORE, LR Display), ]), value: {"type":"SEARCH","vId":"30958404624","sId":"309584046244","dateTime":1470700808000,"url":"http://www.website.com/search/10/query.html?atcplmoc","ref":"http://www.website.com/search/10/query.html","guidA":0,"rguidA":0,"query":"query","rCount":138,"pn":1,"fCat":"","facets":"[]","dProducts":[{"sku":"mp01159947","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null},{"sku":"mp01298296","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null},{"sku":"mp01159949","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null},{"sku":"mp01298273","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null},{"sku":"mp03046513","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null},{"sku":"mp01298249","title":null,"offerId":null,"price":null,"priceS":null,"sType":null,"sName":null,"store":null,"fF":null}],"searchId":"query","cleanUrl":"http://www.website.com/search/10/query.html","cleanRef":"http://www.website.com/search/10/query.html"} and timestamp: 1474381762694. This can indicate that the element belonging to the previous relation has been already pruned, even though you expect it to be still there. at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:104) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:295) ... 9 more The pattern I use is the following: Pattern<TrackingEvent, ?> pattern2 = Pattern.<TrackingEvent>begin("LR Display").subtype(LRDisplayEvent.class) .followedBy("Add To Basket").subtype(AddToBasketEvent.class).within(Time.days(15)); With LRDisplayEvent.class and AddToBasketEvent.class, two POJO objects with a tostring() method resulting in the type of JSON appearing in the error. I set the environment's TimeCharacteristic attribute to IngestionTime and I didn't try to change this parameter so far. The error is produced after I intentionally shut down the cluster, and restart it. One of my running job restarts correctly whereas the one using the CEP library encounters this error. I'm using Flink 1.1.1 Do you have any idea for solving this issue ? Regards, Florent |
Free forum by Nabble | Edit this page |