Memory Leak using ProcessingTimeTimers?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory Leak using ProcessingTimeTimers?

snntr
Hi everyone,

we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
KeyedStream with custom Trigger.

On each element the trigger function registers a processing time timer
and deletes the currently registered processing time timer. So we are
registering a lot of timers, but also deleting most of them right away.

The desired functionality is, that the window is purged (and all state
is set to null) after a timeout (last event for this key + timeout).

The performance tests showed, that after a short time (5mins or so) all
the time went to garbage collection. From the heap dumnps, we can tell
that the problem were retained TriggerTasks (with reference to the
TriggerContext) off all the registered processing time timers.

The problems seems to be that when deleting the TriggerTasks the
corresponding Callables are not removed form the queue, the
deleteProcessingTimeTimer-method only removes the Timer from the
set/queues of the TriggerContext itself, but not from the RuntimeContext.

Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
wrong way? If so, is there any other way to achieve the desired
functionality?

We have a workaround in place now (basically just a timeout starting
with the first element in window instead of the last element in the window).

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak using ProcessingTimeTimers?

Aljoscha Krettek
Hi,
you are right, this is a problem. In an earlier version we were only setting very few actual timers using the RuntimeContext because a firing timer will trigger all the timers with a lower timestamp that we have stored in the trigger queue. We have to change the lower level trigger service (in StreamTask) to only store one timer per very short time window, so that if the window operator registers thousands of timers for, say, time 15:30:03 it actually only saves one timer.


Cheers,
Aljoscha

On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
KeyedStream with custom Trigger.

On each element the trigger function registers a processing time timer
and deletes the currently registered processing time timer. So we are
registering a lot of timers, but also deleting most of them right away.

The desired functionality is, that the window is purged (and all state
is set to null) after a timeout (last event for this key + timeout).

The performance tests showed, that after a short time (5mins or so) all
the time went to garbage collection. From the heap dumnps, we can tell
that the problem were retained TriggerTasks (with reference to the
TriggerContext) off all the registered processing time timers.

The problems seems to be that when deleting the TriggerTasks the
corresponding Callables are not removed form the queue, the
deleteProcessingTimeTimer-method only removes the Timer from the
set/queues of the TriggerContext itself, but not from the RuntimeContext.

Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
wrong way? If so, is there any other way to achieve the desired
functionality?

We have a workaround in place now (basically just a timeout starting
with the first element in window instead of the last element in the window).

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak using ProcessingTimeTimers?

snntr
Hi Aljoscha,

thanks for looking into it. I have moved the discussion to the issue.

Cheers,

Konstantin

On 27.03.2016 09:35, Aljoscha Krettek wrote:

> Hi,
> you are right, this is a problem. In an earlier version we were only
> setting very few actual timers using the RuntimeContext because a firing
> timer will trigger all the timers with a lower timestamp that we have
> stored in the trigger queue. We have to change the lower level trigger
> service (in StreamTask) to only store one timer per very short time
> window, so that if the window operator registers thousands of timers
> for, say, time 15:30:03 it actually only saves one timer.
>
> I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669
>
> Cheers,
> Aljoscha
>
> On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
>     KeyedStream with custom Trigger.
>
>     On each element the trigger function registers a processing time timer
>     and deletes the currently registered processing time timer. So we are
>     registering a lot of timers, but also deleting most of them right away.
>
>     The desired functionality is, that the window is purged (and all state
>     is set to null) after a timeout (last event for this key + timeout).
>
>     The performance tests showed, that after a short time (5mins or so) all
>     the time went to garbage collection. From the heap dumnps, we can tell
>     that the problem were retained TriggerTasks (with reference to the
>     TriggerContext) off all the registered processing time timers.
>
>     The problems seems to be that when deleting the TriggerTasks the
>     corresponding Callables are not removed form the queue, the
>     deleteProcessingTimeTimer-method only removes the Timer from the
>     set/queues of the TriggerContext itself, but not from the
>     RuntimeContext.
>
>     Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
>     wrong way? If so, is there any other way to achieve the desired
>     functionality?
>
>     We have a workaround in place now (basically just a timeout starting
>     with the first element in window instead of the last element in the
>     window).
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082
Reply | Threaded
Open this post in threaded view
|

Re: Memory Leak using ProcessingTimeTimers?

Aljoscha Krettek
Hi,
by the way, form looking at your email I gather that you want to do some kind of session windowing. Is that correct? I have a pull request that should make it into the next version that adds proper support for session windows. Right now this is only implemented for event-time, since this is the hard part. But support for processing-time will be trivial to add.


Cheers,
Aljoscha

On Wed, 30 Mar 2016 at 09:51 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

thanks for looking into it. I have moved the discussion to the issue.

Cheers,

Konstantin

On 27.03.2016 09:35, Aljoscha Krettek wrote:
> Hi,
> you are right, this is a problem. In an earlier version we were only
> setting very few actual timers using the RuntimeContext because a firing
> timer will trigger all the timers with a lower timestamp that we have
> stored in the trigger queue. We have to change the lower level trigger
> service (in StreamTask) to only store one timer per very short time
> window, so that if the window operator registers thousands of timers
> for, say, time 15:30:03 it actually only saves one timer.
>
> I created a Jira Issue: https://issues.apache.org/jira/browse/FLINK-3669
>
> Cheers,
> Aljoscha
>
> On Thu, 24 Mar 2016 at 11:30 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     we were testing a Flink streaming job (1.0.0), with a GlobalWindow on a
>     KeyedStream with custom Trigger.
>
>     On each element the trigger function registers a processing time timer
>     and deletes the currently registered processing time timer. So we are
>     registering a lot of timers, but also deleting most of them right away.
>
>     The desired functionality is, that the window is purged (and all state
>     is set to null) after a timeout (last event for this key + timeout).
>
>     The performance tests showed, that after a short time (5mins or so) all
>     the time went to garbage collection. From the heap dumnps, we can tell
>     that the problem were retained TriggerTasks (with reference to the
>     TriggerContext) off all the registered processing time timers.
>
>     The problems seems to be that when deleting the TriggerTasks the
>     corresponding Callables are not removed form the queue, the
>     deleteProcessingTimeTimer-method only removes the Timer from the
>     set/queues of the TriggerContext itself, but not from the
>     RuntimeContext.
>
>     Is this a bug? Are we using ProcessingTimeTimers in a fundamentally
>     wrong way? If so, is there any other way to achieve the desired
>     functionality?
>
>     We have a workaround in place now (basically just a timeout starting
>     with the first element in window instead of the last element in the
>     window).
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082