Timer coalescing necessary?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Timer coalescing necessary?

Kien Truong
Hi,

We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?

Best regards,
Kien
Reply | Threaded
Open this post in threaded view
|

Re: Timer coalescing necessary?

Aljoscha Krettek
Hi,

If you have multiple timers per key, then coalescing can make sense to reduce the burden on the timer system. Coalescing them across different keys would not be possible right now.

Best,
Aljoscha

> On 13. Oct 2017, at 06:37, Kien Truong <[hidden email]> wrote:
>
> Hi,
>
> We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?
>
> Best regards,
> Kien

Reply | Threaded
Open this post in threaded view
|

Re: Timer coalescing necessary?

Kien Truong

Hi Aljoscha,

Could you clarify how the timer system works right now ?

For example, let's say I have a function F, with 3 keys that are registered to execute at processing time T.

Would Flink maintain a single internal timer at time T, then run the callback on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will be triggered separately at time T  ?


Best regards,

Kien

On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
Hi,

If you have multiple timers per key, then coalescing can make sense to reduce the burden on the timer system. Coalescing them across different keys would not be possible right now.

Best,
Aljoscha

On 13. Oct 2017, at 06:37, Kien Truong [hidden email] wrote:

Hi,

We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?

Best regards,
Kien

    
Reply | Threaded
Open this post in threaded view
|

Re: Timer coalescing necessary?

Aljoscha Krettek
Hi,

This is slightly different for processing-time and event-time triggers.

First, event-time triggers: there are two data structures, a PriorityQueue (which is implemented as a heap) of timers that is sorted by timestamp, a set of registered timers that is used for deduplication. When adding a timer, we first check whether it already exists (using the set) and then add it to the queue. Whenever we receive a watermark we poll from the timer queue as long as the timestamp of the top timer is <= the watermark. We remote the timer from the set and call the user callback.

For processing-time triggers it's very similar, except that we use a ProcessingTimeService instead of the watermark for advancing time. We always have one "physical" processing-time timer set at the ProcessingTimeService. When this fires we follow the same procedure as for event-time and then register a new "physical" timer for the next lowest processing-time timer.

In you case this would mean 3 separate internal timers, but a timer is only a timestamp and a key (and a namespace).

Best,
Aljoscha


> On 13. Oct 2017, at 13:56, Kien Truong <[hidden email]> wrote:
>
> Hi Aljoscha,
>
> Could you clarify how the timer system works right now ?
>
> For example, let's say I have a function F, with 3 keys that are registered to execute at processing time T.
> Would Flink maintain a single internal timer at time T, then run the callback on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will be triggered separately at time T  ?
>
> Best regards,
>
> Kien
> On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
>> Hi,
>>
>> If you have multiple timers per key, then coalescing can make sense to reduce the burden on the timer system. Coalescing them across different keys would not be possible right now.
>>
>> Best,
>> Aljoscha
>>
>>
>>> On 13. Oct 2017, at 06:37, Kien Truong <[hidden email]>
>>>  wrote:
>>>
>>> Hi,
>>>
>>> We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?
>>>
>>> Best regards,
>>> Kien
>>>

Reply | Threaded
Open this post in threaded view
|

Re: Timer coalescing necessary?

Kien Truong

Hi,

Thanks for the explanation.

Because timer callback and normal execution are not guarantee to be concurrent-safe, if we have multiple timers with the same timestamp, are all of them run before the normal execution resume or are they interleaved with normal execution?

Also may I ask how often are the ProcessingTimeService fired ?


Best regards,

Kien

On 10/13/2017 7:48 PM, Aljoscha Krettek wrote:
Hi,

This is slightly different for processing-time and event-time triggers.

First, event-time triggers: there are two data structures, a PriorityQueue (which is implemented as a heap) of timers that is sorted by timestamp, a set of registered timers that is used for deduplication. When adding a timer, we first check whether it already exists (using the set) and then add it to the queue. Whenever we receive a watermark we poll from the timer queue as long as the timestamp of the top timer is <= the watermark. We remote the timer from the set and call the user callback.

For processing-time triggers it's very similar, except that we use a ProcessingTimeService instead of the watermark for advancing time. We always have one "physical" processing-time timer set at the ProcessingTimeService. When this fires we follow the same procedure as for event-time and then register a new "physical" timer for the next lowest processing-time timer.

In you case this would mean 3 separate internal timers, but a timer is only a timestamp and a key (and a namespace). 

Best,
Aljoscha


On 13. Oct 2017, at 13:56, Kien Truong [hidden email] wrote:

Hi Aljoscha,

Could you clarify how the timer system works right now ?

For example, let's say I have a function F, with 3 keys that are registered to execute at processing time T. 
Would Flink maintain a single internal timer at time T, then run the callback on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will be triggered separately at time T  ?

Best regards,

Kien
On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
Hi,

If you have multiple timers per key, then coalescing can make sense to reduce the burden on the timer system. Coalescing them across different keys would not be possible right now.

Best,
Aljoscha


On 13. Oct 2017, at 06:37, Kien Truong [hidden email]
 wrote:

Hi,

We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?

Best regards,
Kien


    
Reply | Threaded
Open this post in threaded view
|

Re: Timer coalescing necessary?

Aljoscha Krettek
Hi,

Because of how they are triggered by the watermark, all event-time triggers with the same timestamp will be triggered in the same go, without interleaving other calls. Same is true for processing-time triggers because they "piggy back" on the one "physical" processing-time service trigger.

Regarding how often the ProcessingTimeService fires: as often as needed. I.e. we have a bunch of timers for T = 100 and some timers for T = 900. Then we will have a processing-time service firing at 100 and one at 900.

Best,
Aljoscha

On 13. Oct 2017, at 15:36, Kien Truong <[hidden email]> wrote:

Hi,

Thanks for the explanation.

Because timer callback and normal execution are not guarantee to be concurrent-safe, if we have multiple timers with the same timestamp, are all of them run before the normal execution resume or are they interleaved with normal execution?

Also may I ask how often are the ProcessingTimeService fired ?


Best regards,

Kien

On 10/13/2017 7:48 PM, Aljoscha Krettek wrote:
Hi,

This is slightly different for processing-time and event-time triggers.

First, event-time triggers: there are two data structures, a PriorityQueue (which is implemented as a heap) of timers that is sorted by timestamp, a set of registered timers that is used for deduplication. When adding a timer, we first check whether it already exists (using the set) and then add it to the queue. Whenever we receive a watermark we poll from the timer queue as long as the timestamp of the top timer is <= the watermark. We remote the timer from the set and call the user callback.

For processing-time triggers it's very similar, except that we use a ProcessingTimeService instead of the watermark for advancing time. We always have one "physical" processing-time timer set at the ProcessingTimeService. When this fires we follow the same procedure as for event-time and then register a new "physical" timer for the next lowest processing-time timer.

In you case this would mean 3 separate internal timers, but a timer is only a timestamp and a key (and a namespace). 

Best,
Aljoscha


On 13. Oct 2017, at 13:56, Kien Truong [hidden email] wrote:

Hi Aljoscha,

Could you clarify how the timer system works right now ?

For example, let's say I have a function F, with 3 keys that are registered to execute at processing time T. 
Would Flink maintain a single internal timer at time T, then run the callback on all 3 keys when it's triggered ? Or there'd be 3 internal timers that will be triggered separately at time T  ?

Best regards,

Kien
On 10/13/2017 6:43 PM, Aljoscha Krettek wrote:
Hi,

If you have multiple timers per key, then coalescing can make sense to reduce the burden on the timer system. Coalescing them across different keys would not be possible right now.

Best,
Aljoscha


On 13. Oct 2017, at 06:37, Kien Truong [hidden email]
 wrote:

Hi,

We are having a streaming job where we use timers to implement key timeout for stateful functions. Should we implement coalescing logic to reduce the number of timer trigger, or it is not necessary with Flink?

Best regards,
Kien