Flink window triggering and timing on connected streams

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink window triggering and timing on connected streams

Andrew Roberts
Hello,

I’m trying to implement session windows over a set of connected streams (event time), with some custom triggering behavior. Essentially, I allow very long session gaps, but I have an “end session” event that I want to cause the window to fire and purge. I’m assigning timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the watermark. I have things mostly wired up, but I have some confusion about how I can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” event always comes from. If I have a session involving events from time t to t’ in stream A, and then at t’ I get an “end session”, I want to ensure that the window doesn’t fire until stream B has also processed events (added events to the window) up to time t’. My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end event”, with the idea that that timer will fire when the WATERMARK passes that time, i.e., all streams have progressed at least as far as that end event. However, the implementation of EventTimeTrigger doesn’t really look like that’s what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this “session end event” concept that I can take a look at?

Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink window triggering and timing on connected streams

Hequn Cheng
Hi Andrew,

>  I have an “end session” event that I want to cause the window to fire and purge.
Do you want to fire the window only by the 'end session' event? I see one option to solve the problem. You can use a tumbling window(say 5s) and set your timestamp to t‘+5s each time receiving an 'end session' event in your user-defined `AssignerWithPeriodicWatermarks`.

> My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.
Yes, and we can make use of this to make window fires only on 'end session' event using the solution above.

Best, Hequn


On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to implement session windows over a set of connected streams (event time), with some custom triggering behavior. Essentially, I allow very long session gaps, but I have an “end session” event that I want to cause the window to fire and purge. I’m assigning timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the watermark. I have things mostly wired up, but I have some confusion about how I can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” event always comes from. If I have a session involving events from time t to t’ in stream A, and then at t’ I get an “end session”, I want to ensure that the window doesn’t fire until stream B has also processed events (added events to the window) up to time t’. My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end event”, with the idea that that timer will fire when the WATERMARK passes that time, i.e., all streams have progressed at least as far as that end event. However, the implementation of EventTimeTrigger doesn’t really look like that’s what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this “session end event” concept that I can take a look at?

Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink window triggering and timing on connected streams

Andrew Roberts
I’m not sure that approach will work for me, as I have many sessions going at the same time which can overlap. Also, I need to be able to have sessions time out if they never receive an end event. Do you know directly if setting a timer triggers when any timestamp passes that time, or when the watermark passes that time?


On Feb 25, 2019, at 9:08 PM, Hequn Cheng <[hidden email]> wrote:

Hi Andrew,

>  I have an “end session” event that I want to cause the window to fire and purge.
Do you want to fire the window only by the 'end session' event? I see one option to solve the problem. You can use a tumbling window(say 5s) and set your timestamp to t‘+5s each time receiving an 'end session' event in your user-defined `AssignerWithPeriodicWatermarks`.

> My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.
Yes, and we can make use of this to make window fires only on 'end session' event using the solution above.

Best, Hequn


On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to implement session windows over a set of connected streams (event time), with some custom triggering behavior. Essentially, I allow very long session gaps, but I have an “end session” event that I want to cause the window to fire and purge. I’m assigning timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the watermark. I have things mostly wired up, but I have some confusion about how I can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” event always comes from. If I have a session involving events from time t to t’ in stream A, and then at t’ I get an “end session”, I want to ensure that the window doesn’t fire until stream B has also processed events (added events to the window) up to time t’. My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end event”, with the idea that that timer will fire when the WATERMARK passes that time, i.e., all streams have progressed at least as far as that end event. However, the implementation of EventTimeTrigger doesn’t really look like that’s what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this “session end event” concept that I can take a look at?

Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink window triggering and timing on connected streams

Till Rohrmann
Hi Andrew,

if using connected streams (e.g. CoFlatMapFunction or CoMapFunction), then the watermarks will be synchronized across both inputs. Concretely, you will always emit the minimum of the watermarks arriving on input channel 1 and 2. Take a look at AbstractStreamOperator.java:773-804.

Cheers,
Till

On Tue, Feb 26, 2019 at 4:27 AM Andrew Roberts <[hidden email]> wrote:
I’m not sure that approach will work for me, as I have many sessions going at the same time which can overlap. Also, I need to be able to have sessions time out if they never receive an end event. Do you know directly if setting a timer triggers when any timestamp passes that time, or when the watermark passes that time?


On Feb 25, 2019, at 9:08 PM, Hequn Cheng <[hidden email]> wrote:

Hi Andrew,

>  I have an “end session” event that I want to cause the window to fire and purge.
Do you want to fire the window only by the 'end session' event? I see one option to solve the problem. You can use a tumbling window(say 5s) and set your timestamp to t‘+5s each time receiving an 'end session' event in your user-defined `AssignerWithPeriodicWatermarks`.

> My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.
Yes, and we can make use of this to make window fires only on 'end session' event using the solution above.

Best, Hequn


On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to implement session windows over a set of connected streams (event time), with some custom triggering behavior. Essentially, I allow very long session gaps, but I have an “end session” event that I want to cause the window to fire and purge. I’m assigning timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the watermark. I have things mostly wired up, but I have some confusion about how I can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” event always comes from. If I have a session involving events from time t to t’ in stream A, and then at t’ I get an “end session”, I want to ensure that the window doesn’t fire until stream B has also processed events (added events to the window) up to time t’. My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end event”, with the idea that that timer will fire when the WATERMARK passes that time, i.e., all streams have progressed at least as far as that end event. However, the implementation of EventTimeTrigger doesn’t really look like that’s what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this “session end event” concept that I can take a look at?

Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*
Reply | Threaded
Open this post in threaded view
|

Re: Flink window triggering and timing on connected streams

Rong Rong
Hi Andrew, 

To add to the answer Till and Hequn already provide. WindowOperator are operating on a per-key-group based. so as long as you only have one open session per partition key group, you should be able to manage the windowing using the watermark strategy Hequn mentioned. 
As Till mentioned, the watermarks are the minimum of the connected streams, thus you should be able to just use "session window with long timeout" as you described.

One thought is that have you looked at Flink CEP[1]? This use case seems to fit pretty well if you can do the co-stream function as a first stage.

--
Rong


On Tue, Feb 26, 2019 at 2:31 AM Till Rohrmann <[hidden email]> wrote:
Hi Andrew,

if using connected streams (e.g. CoFlatMapFunction or CoMapFunction), then the watermarks will be synchronized across both inputs. Concretely, you will always emit the minimum of the watermarks arriving on input channel 1 and 2. Take a look at AbstractStreamOperator.java:773-804.

Cheers,
Till

On Tue, Feb 26, 2019 at 4:27 AM Andrew Roberts <[hidden email]> wrote:
I’m not sure that approach will work for me, as I have many sessions going at the same time which can overlap. Also, I need to be able to have sessions time out if they never receive an end event. Do you know directly if setting a timer triggers when any timestamp passes that time, or when the watermark passes that time?


On Feb 25, 2019, at 9:08 PM, Hequn Cheng <[hidden email]> wrote:

Hi Andrew,

>  I have an “end session” event that I want to cause the window to fire and purge.
Do you want to fire the window only by the 'end session' event? I see one option to solve the problem. You can use a tumbling window(say 5s) and set your timestamp to t‘+5s each time receiving an 'end session' event in your user-defined `AssignerWithPeriodicWatermarks`.

> My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.
Yes, and we can make use of this to make window fires only on 'end session' event using the solution above.

Best, Hequn


On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts <[hidden email]> wrote:
Hello,

I’m trying to implement session windows over a set of connected streams (event time), with some custom triggering behavior. Essentially, I allow very long session gaps, but I have an “end session” event that I want to cause the window to fire and purge. I’m assigning timestamps and watermarks using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the watermark. I have things mostly wired up, but I have some confusion about how I can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” event always comes from. If I have a session involving events from time t to t’ in stream A, and then at t’ I get an “end session”, I want to ensure that the window doesn’t fire until stream B has also processed events (added events to the window) up to time t’. My understanding is that this is what the trailing watermark is for, and that in connected streams, the lowest (earliest) watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end event”, with the idea that that timer will fire when the WATERMARK passes that time, i.e., all streams have progressed at least as far as that end event. However, the implementation of EventTimeTrigger doesn’t really look like that’s what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this “session end event” concept that I can take a look at?

Thanks,

Andrew
--
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*