Stream Join With Early firings

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream Join With Early firings

Johannes Schulte
Hi,

I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics?

Thanks in advance,

Johannes

Reply | Threaded
Open this post in threaded view
|

Re: Stream Join With Early firings

Fabian Hueske-2
Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger (see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a record at 32 is received.
* windows store all records in a list. For every firing, you need to iterate the full list and also track which records you joined already to avoid duplicates. Maybe you can migrate records from the window state into a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi,

I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics?

Thanks in advance,

Johannes


Reply | Threaded
Open this post in threaded view
|

Re: Stream Join With Early firings

Johannes Schulte
Hi Fabian,

thanks for the hints, though I somehow got the feeling that I am on the wrong track given how much code I would need to write for implementing a "blueprint" usecase.

Would a join be more simple using the Table API? In the end it's the classical Order & OrderPosition example, where the output is an upsert-stream. Would I get the expected behaviour (output elements on every update on either side of the input stream). I realize that my session window approach wasn't driven by the requirements but by operational aspects (state size), so using a concept like idle state retention time would be a more natural fit.

Thanks,

Johannes

On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[hidden email]> wrote:
Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger (see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a record at 32 is received.
* windows store all records in a list. For every firing, you need to iterate the full list and also track which records you joined already to avoid duplicates. Maybe you can migrate records from the window state into a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi,

I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics?

Thanks in advance,

Johannes


Reply | Threaded
Open this post in threaded view
|

Re: Stream Join With Early firings

Fabian Hueske-2
Hi Johannes,

You are right. You should approach the problem with the semantics that you need before thinking about optimizations such as state size.

The Table API / SQL offers (in v1.5.0) two types of joins:
1) Windowed joins where each record joins with records in a time-range of the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
2) Non-windowed joins, which support arbitrary join predicates but which fully materialize both inputs. As you mentioned, you can use idle state retention to remove records from state that have not been accessed for a certain time.

Best, Fabian

2018-06-18 11:09 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi Fabian,

thanks for the hints, though I somehow got the feeling that I am on the wrong track given how much code I would need to write for implementing a "blueprint" usecase.

Would a join be more simple using the Table API? In the end it's the classical Order & OrderPosition example, where the output is an upsert-stream. Would I get the expected behaviour (output elements on every update on either side of the input stream). I realize that my session window approach wasn't driven by the requirements but by operational aspects (state size), so using a concept like idle state retention time would be a more natural fit.

Thanks,

Johannes

On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[hidden email]> wrote:
Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger (see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a record at 32 is received.
* windows store all records in a list. For every firing, you need to iterate the full list and also track which records you joined already to avoid duplicates. Maybe you can migrate records from the window state into a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi,

I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics?

Thanks in advance,

Johannes



Reply | Threaded
Open this post in threaded view
|

Re: Stream Join With Early firings

Johannes Schulte
Thanks Fabian! This seems to be the way to go

On Tue, Jun 19, 2018 at 12:18 PM Fabian Hueske <[hidden email]> wrote:
Hi Johannes,

You are right. You should approach the problem with the semantics that you need before thinking about optimizations such as state size.

The Table API / SQL offers (in v1.5.0) two types of joins:
1) Windowed joins where each record joins with records in a time-range of the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
2) Non-windowed joins, which support arbitrary join predicates but which fully materialize both inputs. As you mentioned, you can use idle state retention to remove records from state that have not been accessed for a certain time.

Best, Fabian

2018-06-18 11:09 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi Fabian,

thanks for the hints, though I somehow got the feeling that I am on the wrong track given how much code I would need to write for implementing a "blueprint" usecase.

Would a join be more simple using the Table API? In the end it's the classical Order & OrderPosition example, where the output is an upsert-stream. Would I get the expected behaviour (output elements on every update on either side of the input stream). I realize that my session window approach wasn't driven by the requirements but by operational aspects (state size), so using a concept like idle state retention time would be a more natural fit.

Thanks,

Johannes

On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[hidden email]> wrote:
Hi Johannes,

EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default trigger (see EventTimeSessionWindows.getDefaultTrigger()).

I would take the EventTimeTrigger and extend it with early firing functionality.
However, there are a few things to consider
* you need to be aware that session window can be merged, i.e., two session windows A, B with gap 10: A [20,25), B [37, 45), will be merged when a record at 32 is received.
* windows store all records in a list. For every firing, you need to iterate the full list and also track which records you joined already to avoid duplicates. Maybe you can migrate records from the window state into a custom state defined in a ProcessWindowFunction.

Best, Fabian





2018-06-13 13:43 GMT+02:00 Johannes Schulte <[hidden email]>:
Hi,

I am joining two streams with a session window and want to emit a joined (early) result for every element arriving on one of the streams.

Currently the code looks like this:

s1.join(s2)
.where(s1.id).equalTo(s2.id)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
// trigger(?)
.apply(...custom code..)

What I am missing is the right trigger ala "withEarlyFiring" - do I need to implement my on trigger for this and if yes, what kind of functionality must be present to not break the session window semantics?

Thanks in advance,

Johannes