Hi everyone,
we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a KeyedStream with custom Trigger. On each element the trigger function registers a processing time timer and deletes the currently registered processing time timer. So we are registering a lot of timers, but also deleting most of them right away. The desired functionality is, that the window is purged (and all state is set to null) after a timeout (last event for this key + timeout). The performance tests showed, that after a short time (5mins or so) all the time went to garbage collection. From the heap dumnps, we can tell that the problem were retained TriggerTasks (with reference to the TriggerContext) off all the registered processing time timers. The problems seems to be that when deleting the TriggerTasks the corresponding Callables are not removed form the queue, the deleteProcessingTimeTimer-method only removes the Timer from the set/queues of the TriggerContext itself, but not from the RuntimeContext. Is this a bug? Are we using ProcessingTimeTimers in a fundamentally wrong way? If so, is there any other way to achieve the desired functionality? We have a workaround in place now (basically just a timeout starting with the first element in window instead of the last element in the window). Cheers, Konstantin -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi,
you are right, this is a problem. In an earlier version we were only setting very few actual timers using the RuntimeContext because a firing timer will trigger all the timers with a lower timestamp that we have stored in the trigger queue. We have to change the lower level trigger service (in StreamTask) to only store one timer per very short time window, so that if the window operator registers thousands of timers for, say, time 15:30:03 it actually only saves one timer. I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669 Cheers, Aljoscha On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf <[hidden email]> wrote: Hi everyone, |
Hi Aljoscha,
thanks for looking into it. I have moved the discussion to the issue. Cheers, Konstantin On 27.03.2016 09:35, Aljoscha Krettek wrote: > Hi, > you are right, this is a problem. In an earlier version we were only > setting very few actual timers using the RuntimeContext because a firing > timer will trigger all the timers with a lower timestamp that we have > stored in the trigger queue. We have to change the lower level trigger > service (in StreamTask) to only store one timer per very short time > window, so that if the window operator registers thousands of timers > for, say, time 15:30:03 it actually only saves one timer. > > I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669 > > Cheers, > Aljoscha > > On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi everyone, > > we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a > KeyedStream with custom Trigger. > > On each element the trigger function registers a processing time timer > and deletes the currently registered processing time timer. So we are > registering a lot of timers, but also deleting most of them right away. > > The desired functionality is, that the window is purged (and all state > is set to null) after a timeout (last event for this key + timeout). > > The performance tests showed, that after a short time (5mins or so) all > the time went to garbage collection. From the heap dumnps, we can tell > that the problem were retained TriggerTasks (with reference to the > TriggerContext) off all the registered processing time timers. > > The problems seems to be that when deleting the TriggerTasks the > corresponding Callables are not removed form the queue, the > deleteProcessingTimeTimer-method only removes the Timer from the > set/queues of the TriggerContext itself, but not from the > RuntimeContext. > > Is this a bug? Are we using ProcessingTimeTimers in a fundamentally > wrong way? If so, is there any other way to achieve the desired > functionality? > > We have a workaround in place now (basically just a timeout starting > with the first element in window instead of the last element in the > window). > > Cheers, > > Konstantin > > -- > Konstantin Knauf * [hidden email] > <mailto:[hidden email]> * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 |
Hi, by the way, form looking at your email I gather that you want to do some kind of session windowing. Is that correct? I have a pull request that should make it into the next version that adds proper support for session windows. Right now this is only implemented for event-time, since this is the hard part. But support for processing-time will be trivial to add. The PR is here: https://github.com/apache/flink/pull/1802 Cheers, Aljoscha On Wed, 30 Mar 2016 at 09:51 Konstantin Knauf <[hidden email]> wrote: Hi Aljoscha, |
Free forum by Nabble | Edit this page |