Process Function's timers "postponing"

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Process Function's timers "postponing"

Andrea Spina
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Process Function's timers "postponing"

Yun Tang
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"
 
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Process Function's timers "postponing"

Andrea Spina
Hi Yun, thank you for your answer. I'm not sure I got your point. My question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering.

In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <[hidden email]> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"
 
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Process Function's timers "postponing"

Yun Tang
If you are using processing time, one possible way is to track last registered in another ValueState<long>. And you could call #deleteProcessingTimeTimer(time) when you register new timer and found previous timer which stored in ValueState has smaller timestamp(T1) than current time (T2). After delete that processing timer, T1 would not trigger any action. You could refer to [1] and its usage for similar ideas.




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"
 
Hi Yun, thank you for your answer. I'm not sure I got your point. My question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering.

In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <[hidden email]> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"
 
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT
Reply | Threaded
Open this post in threaded view
|

Re: Process Function's timers "postponing"

Andrea Spina
Hi Yun, thank you so much. That was an idea, I wanted to avoid to store an additional state for it. In the end, I went for coalescing as documentation suggested so that I will have just one timer per interval. What I didn't catch initially from the documentation is that for a determined key and a determined timestamp Flink will retain just one timer, i.e. if I set two timers to trigger at the same time T, Flink will trigger the timer once. 
I accept then to have at least one coalesced timer per interval.

Thank you again for your support!

Il giorno mar 25 giu 2019 alle ore 19:14 Yun Tang <[hidden email]> ha scritto:
If you are using processing time, one possible way is to track last registered in another ValueState<long>. And you could call #deleteProcessingTimeTimer(time) when you register new timer and found previous timer which stored in ValueState has smaller timestamp(T1) than current time (T2). After delete that processing timer, T1 would not trigger any action. You could refer to [1] and its usage for similar ideas.




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"
 
Hi Yun, thank you for your answer. I'm not sure I got your point. My question is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in order to do that, I want to remove the previous timer T1 in order to "postpone" the triggering.

In other words, I would like for a single key to be active just one-timer and if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang <[hidden email]> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang




From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"
 
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT