Currently I'm designing a CEP pattern to satisfy our business needs.
Basically, there's two events let's call it a and b. Both a and b can have zero or multiple entries in the log. For input {a,b1,b2}, I want to get the output of {a,b1,b2} For input {b1,b2}, I want to get the output of {b1,b2} after a timeout period(10 seconds) For input {a1,a2}, I want to get the output of {a1,a2} after a timeout period(10 seconds) Currently, my code looks like this. val pattern = Pattern .begin[Event]("start") .where(_._.getName == "a") .oneOrMore.optional .followedBy("end") .where(_._.getName == "b") .oneOrMore.optional .within(Time.seconds(10)) For input {a,b1,b2}, I got the output of {a1} {a1,b1} {a,b1,b2} {b1,b2} {b2} For input {b1,b2}, I got the output of {b1,b2} {b1} {b2} For input {a1,a2}, I got the output of {a1,a2} {a1} {a2} Also tried SKIP_PAST_LAST_EVENT policy, and didn't work. Please advice me how to design this pattern to match my needs. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I think what you ask for is something like timing out greedy[1]
quantifier, which is not supported. As a rather dirty workaround you could try sth like: Pattern .begin[Event]("start") .where(_._.getName == "a") .oneOrMore.optional.greedy .followedBy("end") .where(_._.getName == "b") .oneOrMore.optional.greedy .followedBy("dummy") .where(_ => false) .within(Time.seconds(10)) and work only with the timed out matches. Another option is you can try implementing that logic with ProcessFunction[2] Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/cep.html#quantifiers [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html#process-function-low-level-operations On 20/03/2019 10:35, RayL wrote: > Currently I'm designing a CEP pattern to satisfy our business needs. > Basically, there's two events let's call it a and b. > Both a and b can have zero or multiple entries in the log. > For input {a,b1,b2}, I want to get the output of {a,b1,b2} > For input {b1,b2}, I want to get the output of {b1,b2} after a timeout > period(10 seconds) > For input {a1,a2}, I want to get the output of {a1,a2} after a timeout > period(10 seconds) > > Currently, my code looks like this. > val pattern = Pattern > .begin[Event]("start") > .where(_._.getName == "a") > .oneOrMore.optional > .followedBy("end") > .where(_._.getName == "b") > .oneOrMore.optional > .within(Time.seconds(10)) > For input {a,b1,b2}, I got the output of {a1} {a1,b1} {a,b1,b2} {b1,b2} {b2} > For input {b1,b2}, I got the output of {b1,b2} {b1} {b2} > For input {a1,a2}, I got the output of {a1,a2} {a1} {a2} > > Also tried SKIP_PAST_LAST_EVENT policy, and didn't work. > > Please advice me how to design this pattern to match my needs. > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |