I'm trying to run multiple independent CEP patterns. They're basic patterns, just one input followed by another and my flink job runs fine when just using 1 pattern. If i try to scale this up to add multiple CEP patterns, 200 for example, I start getting memory errors on my cluster. I can definitely add more memory, but I want to know if there is an accepted way to run multiple patterns.
Currently I am just doing a flatselect on the output of CEP.pattern and then making a list of all these inputs and putting a sink on all of the streams. What this is doing is creating a different stream for each pattern, which is causing the output of the Logical Plan in the UI to be too big to even see. Does anybody know of any better way to do this? |
Hi!
I suppose that by memory errors you mean you run out of memory, right? Are you using Flink 1.2 or the current master (upcoming Flink 1.3). The reason I am asking is because Flink 1.2 suffered from this which is now fixed in Flink 1.3, and you are more than welcome to try it out, to also help us with testing the new features. Now if this is not the case, could you share a bit more details about your program? You do a CEP.pattern(input, pattern_x) for each of your patterns? (input is your input stream and x the index of each pattern) Thanks, Kostas
|
Sorry for the quick followup, but another question, in case the JIRA I sent you
is not what affects your job, do your patterns have a timeout (the within() clause) ? If not, then also other parts of the system (e.g. the internal state of your NFA) may grow indefinitely. Kostas
|
I do have a within clause on all the patterns and I am doing CEP.pattern on each one. On the output I am adding a Kafka sink. Since all the patterns are going to the same sink I was wondering if there was a better way to do it rather then having that overhead.
For the memory issues with 1.2, I do not see a branch for 1.3 in the source (https://github.com/apache/flink) Is that just the current master branch? |
Yes this is the master branch.
We have not yet forked the 1.3 branch. And I do not think there is a better way and I am not sure if there can be. Apart from the memory leak that is described in the JIRA, the different NFA’s cannot share any state, so for each one the associated memory overhead is inevitable I think. We could potentially further reduce this overhead, but we cannot eliminate it. Thanks, Kostas > On Apr 28, 2017, at 3:47 PM, mclendenin <[hidden email]> wrote: > > I do have a within clause on all the patterns and I am doing CEP.pattern on > each one. On the output I am adding a Kafka sink. Since all the patterns are > going to the same sink I was wondering if there was a better way to do it > rather then having that overhead. > > For the memory issues with 1.2, I do not see a branch for 1.3 in the source > (https://github.com/apache/flink) Is that just the current master branch? > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871p12893.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Ok, I will try using Flink 1.3
|
Perfect! And let us know how it goes!
Kostas > On Apr 28, 2017, at 5:04 PM, mclendenin <[hidden email]> wrote: > > Ok, I will try using Flink 1.3 > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-CEP-Patterns-tp12871p12896.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |