Re: Watermarks as "process completion" flags
Posted by
Anton Polyakov on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Watermarks-as-process-completion-flags-tp3631p3765.html
Hi Fabian
Defining a special flag for record seems like a checkpoint barrier. I think I will end up re-implementing checkpointing myself. I found the discussion in flink-dev:
mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… which seems to solve my task. Essentially they want to have a mechanism which will mark record produced by job as “last” and then wait until it’s fully propagated through DAG. Similarly to what I need. Essentially my job which produces trades can also thought as being finished once it produced all trades, then I just need to wait till latest trade produced by this job is processed.
So although windows can probably also be applied, I think propagating barrier through DAG and checkpointing at final job is what I need.
Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like triggering a custom checkoint or finishing streaming job)?
Hi Anton,
If I got your requirements right, you are looking for a solution that continuously produces updated partial aggregates in a streaming fashion. When a special event (no more trades) is received, you would like to store the last update as a final result. Is that correct?
You can compute continuous updates using a reduce() or fold() function. These will produce a new update for each incoming event.
For example:
val s: DataStream[(Int, Long)] = ...
s.keyBy(_._1)
.reduce( (x,y) => (x._1, y._2 + y._2) )
would continuously compute a sum for every key (_._1) and produce an update for each incoming record.
You could add a flag to the record and implement a ReduceFunction that marks a record as final when the no-more-trades event is received.
With a filter and a data sink you could emit such final records to a persistent data store.
Btw.: You can also define custom trigger policies for windows. A custom trigger is called for each element that is added to a window and when certain timers expire. For example with a custom trigger, you can evaluate a window for every second element that is added. You can also define whether the elements in the window should be retained or removed after the evaluation.
Best, Fabian