Hello,
I am trying to do something that seems like it should be quite simple but I haven’t found an efficient way to do this with Flink and I expect I’m missing something obvious here. The task is that I would like to process a sequence of events when a certain number appear within a keyed event-time window. There will be many keys but events within each keyed window will normally be quite sparse. My first guess was to use Flink’s sliding windowing functionality. However my concern is that events are duplicated for each window. I would like to be precise about timing so every event would trigger hundreds of copies of an event in hundreds of windows, most which are then discarded because there are insufficient events. My next guess was to use a process function, and maintain a queue of events as the state. When an event occurred I would add it to the queue and then remove any events which fell off the end of my window. I thought ListState would help here, but that appears to not allow items to be removed. I then thought about using a ValueState with some queue data structure. However my understanding is that changes to a ValueState result in the entire object being copied and so would be quite inefficient and best avoided. Finally I thought about trying to just maintain a series of timers – incrementing on an event and decrementing on its expiry. However I then hit the problem of timer coalescing. If an event occurs at the same time as its predecessor, the timer will not get set so the counter will get incremented but never decremented. What I’m doing seems like it would be a common task but none of the options look good, so I feel I’m missing something. Could anyone offer some advice on how to handle this case? Thanks in advance. Best wishes, Steven |
Steven, I'm pretty sure this is a scenario that doesn't have an obvious good solution. As you have discovered, the window API isn't much help; using a process function does make sense. The challenge is finding a data structure to use in keyed state that can be efficiently accessed and updated. One option would be to use MapState, where the keys are timestamps (longs) and the values are lists of the events with the given timestamps (or just a count of those events, if that's sufficient). If you then use the RocksDB state backend, you can leverage an implementation detail of that state backend, which is that you can iterate over the entries in order, sorted by the key (the serialized, binary key), which in the case of keys that are longs, will do the right thing. Also, with the RocksDB state backend, you only have to do ser/de to access and update individual entries -- and not the entire map. It's not exactly pretty to rely on this, and some of us have been giving some thought to adding a temporal state type to Flink that would make these scenarios feasible to implement efficiently on all of the state backends, but for now, this may be the best solution. Regards, David On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch <[hidden email]> wrote: Hello, |
Thanks David, this is very helpful. I'm glad that it's not just that I had missed something obvious from the (generally very clear) documentation. I found various features that felt almost right (e.g. the priority queue behind Timers) but nothing that did the job. The temporal state idea does sound a very handy feature to have. On Thu, 24 Sep 2020, at 08:50, David Anderson wrote:
|
Free forum by Nabble | Edit this page |