Help with the correct Event Pattern

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with the correct Event Pattern

Federico D'Ambrosio-2
Hello everyone,

I need a bit of help concerning a correct formulation for a Complex Event Pattern, using CEP.

I have a stream of events which once keyed for ids, they may look like this:

a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1

what I want to achieve is to get, from a formulation similar to this:

[1] b c e

this:

b1 c1 e1

that is, for each input stream, have an output composed of only the first appearance of events of class b, c and e.

I realize that a pattern formulated like [1] would also match:

b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.

So, I tried using oneOrMore(), consecutive() and AfterMatchSkipStrategy.skypToFirst, like this:

val b = Pattern
  .begin[Event]("b")
  .where((value, _) => value.state == "b")
  .oneOrMore().consecutive()

val c = Pattern
  .begin[Event]("c")
  .where((value, _) => value.state == "c")
  .oneOrMore().consecutive()

val e = Pattern
  .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
  .where((value, _) => value.state == "e")
  .oneOrMore().consecutive()

val pattern: Pattern[Event, _] =
  b.followedBy(c).followedBy(e)

In the process function I would do something like this:

override def processMatch(matches: util.Map[String, util.List[Event]],
                                  ctx: PatternProcessFunction.Context,
                                  out: Collector[OutputEvent]): Unit =  {

    val bEvent = matches.get("b").asScala.head
    val cEvent = matches.get("c").asScala.head
    val eEvent = matches.get("e").asScala.head

    out.collect(OutputEvent(bEvent, cEvent, eEvent))
}


But unfortunately it doesn't work like I want, which makes me think I'm missing something within the functionalities of Flink CEP.

What's the best way to achieve what I want? Is it possible?
Should I even use any AfterMatchSkipStrategy?

Thank you,
Federico D'Ambrosio
Reply | Threaded
Open this post in threaded view
|

Re: Help with the correct Event Pattern

Dawid Wysakowicz-2

Have you tried pattern like:

Pattern.begin[Event]("b", AfterMatchSkipStrategy.skipPastLast).where(...).followedBy("c").where(...).followedBy("e").where(...)

The method followedBy(Pattern) constructs a Pattern with a subGroup pattern. The skip strategy there does not have any effect.

Best,

Dawid


On 25/07/2019 16:50, Federico D'Ambrosio wrote:
Hello everyone,

I need a bit of help concerning a correct formulation for a Complex Event Pattern, using CEP.

I have a stream of events which once keyed for ids, they may look like this:

a b1 b2 b3 b4 b5 c1 c2 d1 d2 c3 c4 e1 e2 f1

what I want to achieve is to get, from a formulation similar to this:

[1] b c e

this:

b1 c1 e1

that is, for each input stream, have an output composed of only the first appearance of events of class b, c and e.

I realize that a pattern formulated like [1] would also match:

b1 c2 e1, b1 c2 e2 and so on, so that I would need to refine it.

So, I tried using oneOrMore(), consecutive() and AfterMatchSkipStrategy.skypToFirst, like this:

val b = Pattern
  .begin[Event]("b")
  .where((value, _) => value.state == "b")
  .oneOrMore().consecutive()

val c = Pattern
  .begin[Event]("c")
  .where((value, _) => value.state == "c")
  .oneOrMore().consecutive()

val e = Pattern
  .begin[Event]("e", AfterMatchSkipStrategy.skipToFirst("b"))
  .where((value, _) => value.state == "e")
  .oneOrMore().consecutive()

val pattern: Pattern[Event, _] =
  b.followedBy(c).followedBy(e)

In the process function I would do something like this:

override def processMatch(matches: util.Map[String, util.List[Event]],
                                  ctx: PatternProcessFunction.Context,
                                  out: Collector[OutputEvent]): Unit =  {

    val bEvent = matches.get("b").asScala.head
    val cEvent = matches.get("c").asScala.head
    val eEvent = matches.get("e").asScala.head

    out.collect(OutputEvent(bEvent, cEvent, eEvent))
}


But unfortunately it doesn't work like I want, which makes me think I'm missing something within the functionalities of Flink CEP.

What's the best way to achieve what I want? Is it possible?
Should I even use any AfterMatchSkipStrategy?

Thank you,
Federico D'Ambrosio

signature.asc (849 bytes) Download Attachment