Hi,
I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams. Currently the code looks like this: s1.join(s2) .window(EventTimeSessionWindows.withGap(Time.minutes(15))) // trigger(?) .apply(...custom code..) What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics? Thanks in advance, Johannes |
Hi Johannes, EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger (see EventTimeSessionWindows.getDefaultTrigger()). I would take the EventTimeTrigger and extend it with early firing functionality. However, there are a few things to consider * you need to be aware that session window can be merged, i.e., two session windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a record at 32 is received. * windows store all records in a list. For every firing, you need to iterate the full list and also track which records you joined already to avoid duplicates. Maybe you can migrate records from the window state into a custom state defined in a ProcessWindowFunction. Best, Fabian 2018-06-13 13:43 GMT+02:00 Johannes Schulte <[hidden email]>:
|
Hi Fabian, thanks for the hints, though I somehow got the feeling that I am on the wrong track given how much code I would need to write for implementing a "blueprint" usecase. Would a join be more simple using the Table API? In the end it's the classical Order & OrderPosition example, where the output is an upsert-stream. Would I get the expected behaviour (output elements on every update on either side of the input stream). I realize that my session window approach wasn't driven by the requirements but by operational aspects (state size), so using a concept like idle state retention time would be a more natural fit. Thanks, Johannes On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[hidden email]> wrote:
|
Hi Johannes, You are right. You should approach the problem with the semantics that you need before thinking about optimizations such as state size. The Table API / SQL offers (in v1.5.0) two types of joins: 1) Windowed joins where each record joins with records in a time-range of the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)" 2) Non-windowed joins, which support arbitrary join predicates but which fully materialize both inputs. As you mentioned, you can use idle state retention to remove records from state that have not been accessed for a certain time. Best, Fabian 2018-06-18 11:09 GMT+02:00 Johannes Schulte <[hidden email]>:
|
Thanks Fabian! This seems to be the way to go On Tue, Jun 19, 2018 at 12:18 PM Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |