Re: Process Function's timers "postponing"

Posted by Yun Tang on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Process-Function-s-timers-postponing-tp28401p28414.html

Hi Andrea

If my understanding is correct, you just want to know when the eventual timer would be deleted. When you register your timer into 'processingTimeTimersQueue' (where your timer stored) at [1], the 'SystemProcessingTimeService' would then schedule a runnable TriggerTask after the "postpone" delay at [2]. When the scheduled runnable is triggered, it would poll from the 'processingTimeTimersQueue' [3] which means the timer would finally be removed. Hope this could help you.

Best
Yun Tang

[1] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237



From: Andrea Spina <[hidden email]>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"
 
Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I would like to do is to "postpone" eventually registered timers for the given key: I would like to do it since I might process plenty of events in a row (think about it as a session) so that I will able to trigger the computation "just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the previous timer triggering time, which I guess is not possible for me since I use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; for instance, I don't understand what "Since Flink maintains only one timer per key and timestamp...". Does this imply that a new PT timer will automatically overwrite an eventual previously existing one?

Thank you for your precious help,

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT