Continuous aggregation of results until end events matched CEP / ProcessWindowFunction ?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Continuous aggregation of results until end events matched CEP / ProcessWindowFunction ?

dim5b

I watched a video which was published a while back by  Matt Zimmer in Berlin
Flink Forward (Sep 2017)  titled Custom, Complex Windows at Scale using
Apache Flink.

https://www.youtube.com/watch?v=XUvqnsWm8yo

In this window he analyzes a custom implementation of window that he
implemented in order to fit a scenario which to my eyes seems like a rather
general/practical case.

Set of a events that have a specific start trigger and end trigger over a
NON-SPECIFIC period of time. For which, one wants, to also "monitor" events
up to the stop event. During the questions part. To other alternatives were
offered which at the time of the implementation were not "available/looked
at"

1) Cep Library
2) ProcessFunction/ ProcessWindowFunction

I seem to have a similar test case. An I have looked at CEP library and
using GlobalWindow withProcessWindowFunction .
Consider this scenario which is rather similar to the Flink training
material. You have devices emitting ConnectedCarEvent(s) over a period
(minutes/hours...) in any case its not a fixed period that a stop event is
triggered.
You identify a driving sequence (trip) as a list of events from START -->
STOP.

 device_id,trigger_id,event_time,messageId
    1,START,1520433909396,1
    1,TRACKING,1520433914398,2
    1,TRACKING,1520433919398,3
    1,STOP,1520433924398,4
    1,START,1520433929398,5
    1,TRACKING,1520433934399,6
    1,TRACKING,1520433939399,7
    1,TRACKING,1520433944399,8
    1,STOP,1520433949399,9

I am able to match all events using the CEP pattern below

Pattern.<Event>begin("start",
AfterMatchSkipStrategy.skipPastLastEvent()).where(START_CONDITION)
       .next("middle").oneOrMore().where(TRACKING_CONDITION)
       .next("end").where(STOP_CONDITION);

I am not sure how exactly you can track ongoing trips? is there a way to
output unfinished events to another stream?

I have also implemented a the same scenario using ProcessWindowFunction. I
can attach my full code if needed. But basically i use a custom trigger to
monitor for STOP events using the onElement method.An Evictor to evict
messages before stop event. And then call ProcessWindowFunction to Collect
all events that define a trip.

Again I am not sure how to to track ongoing trips? Do I have to declare a
separate stream?  Would onProcessingTime & context Help   ? Must I somehow
query the internal state?

Is there some reference example of such an implementation?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/