Hello,
I am new to Apache Flink and am trying to build a CEP using Flink's API. One of the requirements is the ability to add/change patterns at runtime for anomaly detection (maintaining the systems availability). Any Ideas of how could I do that? For instance, If I have a stream of security events (accesses, authentications ,etc.) and a pattern for detecting anomalies I would like to be able to change that pattern parameters, for instance instead of detecting the occurrence of events A->B->C I would like to change the condition on B to B’ in order to have a new rule. Moreover, I would like to be able to create new patterns dynamically as new use cases arise. Best Regards, Pedro Chaves
Best Regards,
Pedro Chaves |
I have used a JavaScript engine in my CEP to evaluate my patterns. Each event is a list of named attributes (HashMap like). And event is attached to a list of rules expressed as JavaScript code (See example below with one rule but I can match as many rules). The rules are distributed over a connected stream which allow it to change over time. This is how I do it to keep my rules dynamic. If someone has a better way I would love to hear it as well.
private transient ScriptEngineManager factory = new ScriptEngineManager(); private transient ScriptEngine engine = factory.getEngineByName("JavaScript"); /*Inside open*/ factory = new ScriptEngineManager(); /*Close open*/ /*Inside my operator*/ engine = factory.getEngineByName("JavaScript"); engine.put("evt", value.f1); //value.f1 contains a JSON version of my HashMap of attributes engine.eval(value.f2.rule); //f2.last contains the rule which is evaluated by the JavaScript Engine /* Sample JavaScript contained in the call - engine.eval(value.f2.rule); is shown below (not the "evt" variable in the JavaScript and the the preceding line - engine.put("evt", value.f1); var evt=JSON.parse(evt);var result = evt.temperature>50 && evt.pressure<900 */ boolean ret = (boolean)engine.get("result"); if(ret) /*Rule is Matched*/ > On Oct 11, 2016, at 5:01 PM, PedroMrChaves <[hidden email]> wrote: > > Hello, > > I am new to Apache Flink and am trying to build a CEP using Flink's API. One > of the requirements is the ability to add/change patterns at runtime for > anomaly detection (maintaining the systems availability). Any Ideas of how > could I do that? > > For instance, If I have a stream of security events (accesses, > authentications ,etc.) and a pattern for detecting anomalies I would like to > be able to change that pattern parameters, for instance instead of detecting > the occurrence of events A->B->C I would like to change the condition on B > to B’ in order to have a new rule. Moreover, I would like to be able to > create new patterns dynamically as new use cases arise. > > Best Regards, > Pedro Chaves > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
In reply to this post by PedroMrChaves
We also have the same requirement - we want to allow the user to change the matching patterns and have them take effect immediately. I'm wondering whether the proposed trigger DSL takes us one step closer:(I don't think it solves the problem) or we have to dynamically generate Flink job JAR files when the matching rules/patterns are changed and submit them to Flink. - LF From: PedroMrChaves <[hidden email]> To: [hidden email] Sent: Tuesday, October 11, 2016 2:01 PM Subject: What is the best way to load/add patterns dynamically (at runtime) with Flink? Hello, I am new to Apache Flink and am trying to build a CEP using Flink's API. One of the requirements is the ability to add/change patterns at runtime for anomaly detection (maintaining the systems availability). Any Ideas of how could I do that? For instance, If I have a stream of security events (accesses, authentications ,etc.) and a pattern for detecting anomalies I would like to be able to change that pattern parameters, for instance instead of detecting the occurrence of events A->B->C I would like to change the condition on B to B’ in order to have a new rule. Moreover, I would like to be able to create new patterns dynamically as new use cases arise. Best Regards, Pedro Chaves -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
In reply to this post by Sameer Wadkar
Hi Sameer, I just replied to the earlier post, but I will copy it here: We also have
the same requirement - we want to allow the user to change the matching
patterns and have them take effect immediately. I'm wondering whether
the proposed trigger DSL takes us one step closer:(I don't think it
solves the problem) or we have to dynamically generate Flink job JAR
files when the matching rules/patterns are changed and submit them to
Flink. I had thought about using a similar approach, but it is quite restrictive because you cannot use the power for Flink CEP engine with this approach. For example, I want to be able to use followedBy, next, notFollowedBy (in future) operators to detect the patterns and these matching patterns need to be user-cofigurable/dynamic/hot deployable. The simple attribute-based rules/patterns that you specified can be made dynamic as you mentioned but the rules/patterns that use not just the current event attributes, but also past events (e.g. followedBy) are much harder to make them dynamic without some help from Flink that implements the CEP operators. - LF From: Sameer W <[hidden email]> To: "[hidden email]" <[hidden email]> Sent: Tuesday, October 11, 2016 2:23 PM Subject: Re: What is the best way to load/add patterns dynamically (at runtime) with Flink? I have used a JavaScript engine in my CEP to evaluate my patterns. Each event is a list of named attributes (HashMap like). And event is attached to a list of rules expressed as JavaScript code (See example below with one rule but I can match as many rules). The rules are distributed over a connected stream which allow it to change over time. This is how I do it to keep my rules dynamic. If someone has a better way I would love to hear it as well. private transient ScriptEngineManager factory = new ScriptEngineManager(); private transient ScriptEngine engine = factory.getEngineByName("JavaScript"); /*Inside open*/ factory = new ScriptEngineManager(); /*Close open*/ /*Inside my operator*/ engine = factory.getEngineByName("JavaScript"); engine.put("evt", value.f1); //value.f1 contains a JSON version of my HashMap of attributes engine.eval(value.f2.rule); //f2.last contains the rule which is evaluated by the JavaScript Engine /* Sample JavaScript contained in the call - engine.eval(value.f2.rule); is shown below (not the "evt" variable in the JavaScript and the the preceding line - engine.put("evt", value.f1); var evt=JSON.parse(evt);var result = evt.temperature>50 && evt.pressure<900 */ boolean ret = (boolean)engine.get("result"); if(ret) /*Rule is Matched*/ > On Oct 11, 2016, at 5:01 PM, PedroMrChaves <[hidden email]> wrote: > > Hello, > > I am new to Apache Flink and am trying to build a CEP using Flink's API. One > of the requirements is the ability to add/change patterns at runtime for > anomaly detection (maintaining the systems availability). Any Ideas of how > could I do that? > > For instance, If I have a stream of security events (accesses, > authentications ,etc.) and a pattern for detecting anomalies I would like to > be able to change that pattern parameters, for instance instead of detecting > the occurrence of events A->B->C I would like to change the condition on B > to B’ in order to have a new rule. Moreover, I would like to be able to > create new patterns dynamically as new use cases arise. > > Best Regards, > Pedro Chaves > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
This is one of my challenges too- 1. The JavaScript rules are only applicable inside one operator (next, followedBy, notFollowedBy). And the JavaScript rules can apply to only the event in that operator. I make it a little more dynamic by creating a Rules HashMap and add rules with the names "first", "next", "followedBy1" (same as the ones I use for the pattern operator). This way the rules attached to a particular operator can be easily changed via a connected stream. I think the feature where other events in the pattern are accessible is being added. Currently you can only look inside one event in the "where" clause attached to a CEP pattern operator. For example, if I check two consecutive credit card events for a user to calculate the straight line distance between then to divide by the difference in time I cannot do that unless I fire the pattern for every pair and check this condition in the PatternStream's select operator where all the events are accessible. 2. The second problem I have is that I cannot change the rules applied to the pattern stream. For example if I have next.followedBy and I want to add another followedBy it is a compile time change. The JavaScript engine helps me with the first issue but this one just needs a recompile unless you have another Flink Pipeline deployed which can check against that pattern as well. I am guess at this point you will need to take a SavePoint, stop your pipeline, redeploy this new pipeline(with a new pattern configuration) and start again. I would like to know if there is a cleaner solution but the above is my fallback. Sameer On Tue, Oct 11, 2016 at 5:51 PM, <[hidden email]> wrote:
|
In reply to this post by Sameer Wadkar
I've been thinking in several options to solve this problem:
1. I can use Flink savepoints in order to save the application state , change the jar file and submit a new job (as the new jar file with the patterns added/changed). The problem in this case is to be able to correctly handle the savepoints and because I must stop and start the job, the events will be delayed. 2. I can compile java code at runtime using using java compiler library I don't know if this would be a viable solution. 3. I can use a scripting language like you did, but I would lose the ability to use the native Flink library which is available in scala and java.
Best Regards,
Pedro Chaves |
Hey,
I face the same problem and decided to go with your third solution. I use Groovy as the scripting language, which has access to Java classes and therefore also to Flink constructs like Time.seconds(10). See below for an example of a pattern definition with Groovy: private static Binding bind = new Binding(); private static GroovyShell gs = new GroovyShell(bind); Pattern<BDS,?> dynPattern = Pattern .<BDS>begin(two.getState()).where((FilterFunction<BoxDatensatz>) bds -> { bind.setVariable("bds", bds); Object ergPat = gs.evaluate(two.getWhere()); return ( (ergPat instanceof Boolean)) ?(Boolean)ergPat : false; }) .within((gs.evaluate(two.getWithin()) instanceof Time) ? (Time)gs.evaluate(two.getWithin()) : null); Sorry, but I'm not aware on how to format this nicely. I don't know, if it's the best way, but it works :) Best, Claudia -----Ursprüngliche Nachricht----- Von: PedroMrChaves [mailto:[hidden email]] Gesendet: Mittwoch, 12. Oktober 2016 10:32 An: [hidden email] Betreff: Re: What is the best way to load/add patterns dynamically (at runtime) with Flink? I've been thinking in several options to solve this problem: 1. I can use Flink savepoints in order to save the application state , change the jar file and submit a new job (as the new jar file with the patterns added/changed). The problem in this case is to be able to correctly handle the savepoints and because I must stop and start the job, the events will be delayed. 2. I can compile java code at runtime using using java compiler library I don't know if this would be a viable solution. 3. I can use a scripting language like you did, but I would lose the ability to use the native Flink library which is available in scala and java. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-is-the-best-way-to-load-add-patterns-dynamically-at-runtime-with-Flink-tp9461p9470.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
This post was updated on .
In reply to this post by Sameer Wadkar
Hello,
Your tip was very helpful and I took a similar approach. I have something like this: class Processor extends RichCoFlatMapFunction<Event, Rule, String> { public void flatMap1(Event event, Collector<String> out) { process(event,out); // run the javscript (rules) against the incoming events } public void flatMap2(Rule rule , Collector<String> out) { // We add the rule to the list of existing rules addNewRule(rule) } } But now I face a new challenge, I don't have access to the windowed constructs of flink and I can't dynamically create new window aggregations inside the flatMap. At least not that I know of. Did you face a similar problem? Any Ideas? Thank you and regards, Pedro Chaves
Best Regards,
Pedro Chaves |
Hi Pedro, you can have dynamic windows by assigning the windows to elements in your Processor (so you would need to extend that type to have a field for the window). Then, you can write a custom WindowAssigner that will simply get the window from an event and assign that as the internal window. Please let me know if you need more details. Cheers, Aljoscha On Thu, 3 Nov 2016 at 18:40 PedroMrChaves <[hidden email]> wrote: Hello, |
Hi, Thank you for the response. Can you give me an example?I also read this article https://techblog.king.com/rbea-scalable-real-time-analytics-king/. They use a similar approach, but am still not understanding how assign windows. Regards, Pedro Chaves On Thu, Nov 3, 2016 at 6:02 PM, Aljoscha Krettek [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
Best Regards,
Pedro Chaves |
Hi Pedro, yes, I was more or less suggesting a similar approach to the one taken by King. In code, it would look somewhat like this: DataStream<T> input = ...; DataStream<Tuple2<T, DynamicWindow>> withMyWindows = input.map(new AssignMyWindows()) withMyWindows .keyBy(...) .window(new DynamicWindowAssigner()) ... where this is DynamicWindowAssigner: public class DynamicWindowAssigner<T> extends WindowAssigner<Tuple2<T, DynamicWindow>, TimeWindow> { @Override public Collection<TimeWindow> assignWindows(Tuple2<T, DynamicWindow> element, long timestamp, WindowAssignerContext context) { // extract some info from the dynamic window return Collections.singletonList(new TimeWindow(..., ...)); // <- this can also be a custom window } ... } Cheers, Aljoscha On Thu, 3 Nov 2016 at 23:47 PedroMrChaves <[hidden email]> wrote:
|
Hey,
the javascript solution seems very limited. Is there a solution with compiling new patterns to native Flink CEP Patterns and add them to a stream dynamically? best Stephan |
I also found this really interesting post
|
The best answer I can give you is the one given in the post. Currently, there is no way of dynamically changing the patterns. The only way would be to dive into Flink's core code and change the way operators are shipped to the cluster. On Thu, Nov 24, 2016 at 3:34 PM, kaelumania [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: I also found this really interesting post
Best Regards,
Pedro Chaves |
Free forum by Nabble | Edit this page |