Hello everyone, I need a bit of help concerning a correct formulation for a Complex Event Pattern, using CEP. I have a stream of events which once keyed for ids, they may look like this: a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1 what I want to achieve is to get, from a formulation similar to this: [1] b c e this: b1 c1 e1 that is, for each input stream, have an output composed of only the first appearance of events of class b, c and e. I realize that a pattern formulated like [1] would also match: b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it. So, I tried using oneOrMore(), consecutive() and AfterMatchSkipStrategy.skypToFirst, like this: val b = Pattern .begin[Event]("b") .where((value, _) => value.state == "b") .oneOrMore().consecutive() val c = Pattern .begin[Event]("c") .where((value, _) => value.state == "c") .oneOrMore().consecutive() val e = Pattern .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b")) .where((value, _) => value.state == "e") .oneOrMore().consecutive()
val pattern: Pattern[Event, _] = b.followedBy(c).followedBy(e) In the process function I would do something like this: override def processMatch(matches: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, out: Collector[OutputEvent]): Unit = { val bEvent = matches.get("b").asScala.head val cEvent = matches.get("c").asScala.head val eEvent = matches.get("e").asScala.head out.collect(OutputEvent(bEvent, cEvent, eEvent)) } But unfortunately it doesn't work like I want, which makes me think I'm missing something within the functionalities of Flink CEP. What's the best way to achieve what I want? Is it possible? Should I even use any AfterMatchSkipStrategy? Thank you, Federico D'Ambrosio |
Have you tried pattern like: Pattern.begin[Event]("b", AfterMatchSkipStrategy.skipPastLast).where(...).followedBy("c").where(...).followedBy("e").where(...) The method followedBy(Pattern) constructs a Pattern with a subGroup pattern. The skip strategy there does not have any effect. Best, Dawid
On 25/07/2019 16:50, Federico
D'Ambrosio wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |