Window PURGE Behaviour 1.0.2 vs 1.1.3

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Hi everyone,

I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
a problem concerning one of our custom triggers.

The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
the window is PURGEd onProcessingTimeTimer(), but it seems that the all
registered processing time timers are deleted everytime the window is
PURGEd.

clear() is the default implementation, i.e. no-op.

Just wanted to, if this is the expected behavior (processing time timers
being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Hi,
the timers are not actually deleted but the WindowOperator will check whether there is any window state associated with the window for which the timer fires. If there is no window state the timer will silently be ignored.

Is this a problem for you or did you just want to clarify? If yes, then we should work on finding a solution.

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
a problem concerning one of our custom triggers.

The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
the window is PURGEd onProcessingTimeTimer(), but it seems that the all
registered processing time timers are deleted everytime the window is
PURGEd.

clear() is the default implementation, i.e. no-op.

Just wanted to, if this is the expected behavior (processing time timers
being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Hi Aljoscha,

interesting, this explains it. Well, in our case the PURGE in the
onProcessingTimeTimer is only used to clear KeyValueStates*, and at this
point there are usually no records in the window state.

Any Ideas?

I do have a workaround with an evictor, but it seemed to be
unnecessarily complicated.

*We can not use clear()-callback for that, since this state should
survive the FIRE_AND_PURGEs in the onElement()-calls.

Cheers,

Konstantin


On 08.11.2016 18:31, Aljoscha Krettek wrote:

> Hi,
> the timers are not actually deleted but the WindowOperator will check
> whether there is any window state associated with the window for which
> the timer fires. If there is no window state the timer will silently be
> ignored.
>
> Is this a problem for you or did you just want to clarify? If yes, then
> we should work on finding a solution.
>
> Cheers,
> Aljoscha
>
> On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
>     a problem concerning one of our custom triggers.
>
>     The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
>     the window is PURGEd onProcessingTimeTimer(), but it seems that the all
>     registered processing time timers are deleted everytime the window is
>     PURGEd.
>
>     clear() is the default implementation, i.e. no-op.
>
>     Just wanted to, if this is the expected behavior (processing time timers
>     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Hi,
exactly for this case I want to make a change to when Trigger.clear() is called: https://issues.apache.org/jira/browse/FLINK-4994

Right now, clear is called when the window is being garbage collected because we passed the allowed lateness (after this, nothing will ever be added to a window again) and also when the Trigger returns PURGE or FIRE_AND_PURGE.

I want to change it to only be called in the former case. We could possibly add an onPurge() callback to allow cleaning state on purge or require people to put the code that they want to run on PURGE in the Trigger method that returns the PURGE.

What do you think?

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

interesting, this explains it. Well, in our case the PURGE in the
onProcessingTimeTimer is only used to clear KeyValueStates*, and at this
point there are usually no records in the window state.

Any Ideas?

I do have a workaround with an evictor, but it seemed to be
unnecessarily complicated.

*We can not use clear()-callback for that, since this state should
survive the FIRE_AND_PURGEs in the onElement()-calls.

Cheers,

Konstantin


On 08.11.2016 18:31, Aljoscha Krettek wrote:
> Hi,
> the timers are not actually deleted but the WindowOperator will check
> whether there is any window state associated with the window for which
> the timer fires. If there is no window state the timer will silently be
> ignored.
>
> Is this a problem for you or did you just want to clarify? If yes, then
> we should work on finding a solution.
>
> Cheers,
> Aljoscha
>
> On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
>     a problem concerning one of our custom triggers.
>
>     The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
>     the window is PURGEd onProcessingTimeTimer(), but it seems that the all
>     registered processing time timers are deleted everytime the window is
>     PURGEd.
>
>     clear() is the default implementation, i.e. no-op.
>
>     Just wanted to, if this is the expected behavior (processing time timers
>     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

AW: Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Sounds good Aljoscha.

sent from my phone. Plz excuse brevity and tpyos.
---
Konstantin Knauf *[hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke

---- Aljoscha Krettek schrieb ----


Hi,
exactly for this case I want to make a change to when Trigger.clear() is called: https://issues.apache.org/jira/browse/FLINK-4994

Right now, clear is called when the window is being garbage collected because we passed the allowed lateness (after this, nothing will ever be added to a window again) and also when the Trigger returns PURGE or FIRE_AND_PURGE.

I want to change it to only be called in the former case. We could possibly add an onPurge() callback to allow cleaning state on purge or require people to put the code that they want to run on PURGE in the Trigger method that returns the PURGE.

What do you think?

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

interesting, this explains it. Well, in our case the PURGE in the
onProcessingTimeTimer is only used to clear KeyValueStates*, and at this
point there are usually no records in the window state.

Any Ideas?

I do have a workaround with an evictor, but it seemed to be
unnecessarily complicated.

*We can not use clear()-callback for that, since this state should
survive the FIRE_AND_PURGEs in the onElement()-calls.

Cheers,

Konstantin


On 08.11.2016 18:31, Aljoscha Krettek wrote:
> Hi,
> the timers are not actually deleted but the WindowOperator will check
> whether there is any window state associated with the window for which
> the timer fires. If there is no window state the timer will silently be
> ignored.
>
> Is this a problem for you or did you just want to clarify? If yes, then
> we should work on finding a solution.
>
> Cheers,
> Aljoscha
>
> On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
>     a problem concerning one of our custom triggers.
>
>     The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
>     the window is PURGEd onProcessingTimeTimer(), but it seems that the all
>     registered processing time timers are deleted everytime the window is
>     PURGEd.
>
>     clear() is the default implementation, i.e. no-op.
>
>     Just wanted to, if this is the expected behavior (processing time timers
>     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Could you go into some detail of why you need to keep the trigger state?

Just the basics because you probably cannot (should not) talk about your internal stuff.

On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf <[hidden email]> wrote:
Sounds good Aljoscha.

sent from my phone. Plz excuse brevity and tpyos.
---
Konstantin Knauf *[hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke

---- Aljoscha Krettek schrieb ----


Hi,
exactly for this case I want to make a change to when Trigger.clear() is called: https://issues.apache.org/jira/browse/FLINK-4994

Right now, clear is called when the window is being garbage collected because we passed the allowed lateness (after this, nothing will ever be added to a window again) and also when the Trigger returns PURGE or FIRE_AND_PURGE.

I want to change it to only be called in the former case. We could possibly add an onPurge() callback to allow cleaning state on purge or require people to put the code that they want to run on PURGE in the Trigger method that returns the PURGE.

What do you think?

Cheers,
Aljoscha

On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

interesting, this explains it. Well, in our case the PURGE in the
onProcessingTimeTimer is only used to clear KeyValueStates*, and at this
point there are usually no records in the window state.

Any Ideas?

I do have a workaround with an evictor, but it seemed to be
unnecessarily complicated.

*We can not use clear()-callback for that, since this state should
survive the FIRE_AND_PURGEs in the onElement()-calls.

Cheers,

Konstantin


On 08.11.2016 18:31, Aljoscha Krettek wrote:
> Hi,
> the timers are not actually deleted but the WindowOperator will check
> whether there is any window state associated with the window for which
> the timer fires. If there is no window state the timer will silently be
> ignored.
>
> Is this a problem for you or did you just want to clarify? If yes, then
> we should work on finding a solution.
>
> Cheers,
> Aljoscha
>
> On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     I just migrated a streaming Job from 1.0.2 to 1.1.3 and stumbled across
>     a problem concerning one of our custom triggers.
>
>     The trigger basically FIRE_AND_PURGEs multiple times in onElement() and
>     the window is PURGEd onProcessingTimeTimer(), but it seems that the all
>     registered processing time timers are deleted everytime the window is
>     PURGEd.
>
>     clear() is the default implementation, i.e. no-op.
>
>     Just wanted to, if this is the expected behavior (processing time timers
>     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Hi Aljoscha,

as it turns out the "workaround" I was thinking was functionally
working, but had a so to say memory leak. I was under the impression
that evicted elements will be removed from the window state...

Anyway, I think that this (triggers not being evaluated when the window
state is null) turns out to be blocker for us.

Why is this check done? Since a user can do basically whatever she likes
in onProcessingTimeTimer() the comment

// if we have no state, there is nothing to do

is, well, just not true in some cases (e.g. state updates in our case).

Cheers,

Konstantin

On 09.11.2016 14:17, Aljoscha Krettek wrote:

> Could you go into some detail of why you need to keep the trigger state?
>
> Just the basics because you probably cannot (should not) talk about your
> internal stuff.
>
> On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Sounds good Aljoscha.
>
>     sent from my phone. Plz excuse brevity and tpyos.
>     ---
>     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     <tel:0174%203413182>
>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>
>     ---- Aljoscha Krettek schrieb ----
>
>
>     Hi,
>     exactly for this case I want to make a change to when
>     Trigger.clear() is
>     called: https://issues.apache.org/jira/browse/FLINK-4994
>
>     Right now, clear is called when the window is being garbage
>     collected because we passed the allowed lateness (after this,
>     nothing will ever be added to a window again) and also when the
>     Trigger returns PURGE or FIRE_AND_PURGE.
>
>     I want to change it to only be called in the former case. We could
>     possibly add an onPurge() callback to allow cleaning state on purge
>     or require people to put the code that they want to run on PURGE in
>     the Trigger method that returns the PURGE.
>
>     What do you think?
>
>     Cheers,
>     Aljoscha
>
>     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     <[hidden email] <mailto:[hidden email]>>
>     wrote:
>
>         Hi Aljoscha,
>
>         interesting, this explains it. Well, in our case the PURGE in the
>         onProcessingTimeTimer is only used to clear KeyValueStates*, and
>         at this
>         point there are usually no records in the window state.
>
>         Any Ideas?
>
>         I do have a workaround with an evictor, but it seemed to be
>         unnecessarily complicated.
>
>         *We can not use clear()-callback for that, since this state should
>         survive the FIRE_AND_PURGEs in the onElement()-calls.
>
>         Cheers,
>
>         Konstantin
>
>
>         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>         > Hi,
>         > the timers are not actually deleted but the WindowOperator
>         will check
>         > whether there is any window state associated with the window
>         for which
>         > the timer fires. If there is no window state the timer will
>         silently be
>         > ignored.
>         >
>         > Is this a problem for you or did you just want to clarify? If
>         yes, then
>         > we should work on finding a solution.
>         >
>         > Cheers,
>         > Aljoscha
>         >
>         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>         > <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>> wrote:
>         >
>         >     Hi everyone,
>         >
>         >     I just migrated a streaming Job from 1.0.2 to 1.1.3 and
>         stumbled across
>         >     a problem concerning one of our custom triggers.
>         >
>         >     The trigger basically FIRE_AND_PURGEs multiple times in
>         onElement() and
>         >     the window is PURGEd onProcessingTimeTimer(), but it seems
>         that the all
>         >     registered processing time timers are deleted everytime
>         the window is
>         >     PURGEd.
>         >
>         >     clear() is the default implementation, i.e. no-op.
>         >
>         >     Just wanted to, if this is the expected behavior
>         (processing time timers
>         >     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>         >
>         >     Cheers,
>         >
>         >     Konstantin
>         >
>         >     --
>         >     Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]>
>         >     <mailto:[hidden email]
>         <mailto:[hidden email]>> * +49-174-3413182
>         <tel:0174%203413182>
>         >     <tel:0174%203413182>
>         >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>         Unterföhring
>         >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>         Robert Dahlke
>         >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>         >
>         >
>
>         --
>         Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]> * +49-174-3413182
>         <tel:0174%203413182>
>         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Hi Konstantin,
evicting elements not being evicted is a bug that should be fixed for Flink 1.2: https://issues.apache.org/jira/browse/FLINK-4369.

The check about non-existing window state when triggering was introduced because otherwise a Trigger could return FIRE and then there would be nothing to fire. I guess if we did indeed fire the trigger even with non-existing state then some people might wonder why no emission is triggered when their trigger returns FIRE. I see your point though, that the omitted firing is problematic for some cases.

I think having clear() as proposed in https://issues.apache.org/jira/browse/FLINK-4994 would solve your case. You were using your own cleanup timer as a workaround because clear() is currently also called on PURGE. With clear() only being called at the very end this should work, correct?

Cheers,
Aljoscha

On Wed, 9 Nov 2016 at 19:53 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

as it turns out the "workaround" I was thinking was functionally
working, but had a so to say memory leak. I was under the impression
that evicted elements will be removed from the window state...

Anyway, I think that this (triggers not being evaluated when the window
state is null) turns out to be blocker for us.

Why is this check done? Since a user can do basically whatever she likes
in onProcessingTimeTimer() the comment

// if we have no state, there is nothing to do

is, well, just not true in some cases (e.g. state updates in our case).

Cheers,

Konstantin

On 09.11.2016 14:17, Aljoscha Krettek wrote:
> Could you go into some detail of why you need to keep the trigger state?
>
> Just the basics because you probably cannot (should not) talk about your
> internal stuff.
>
> On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Sounds good Aljoscha.
>
>     sent from my phone. Plz excuse brevity and tpyos.
>     ---
>     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>
>     ---- Aljoscha Krettek schrieb ----
>
>
>     Hi,
>     exactly for this case I want to make a change to when
>     Trigger.clear() is
>     called: https://issues.apache.org/jira/browse/FLINK-4994
>
>     Right now, clear is called when the window is being garbage
>     collected because we passed the allowed lateness (after this,
>     nothing will ever be added to a window again) and also when the
>     Trigger returns PURGE or FIRE_AND_PURGE.
>
>     I want to change it to only be called in the former case. We could
>     possibly add an onPurge() callback to allow cleaning state on purge
>     or require people to put the code that they want to run on PURGE in
>     the Trigger method that returns the PURGE.
>
>     What do you think?
>
>     Cheers,
>     Aljoscha
>
>     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     <[hidden email] <mailto:[hidden email]>>
>     wrote:
>
>         Hi Aljoscha,
>
>         interesting, this explains it. Well, in our case the PURGE in the
>         onProcessingTimeTimer is only used to clear KeyValueStates*, and
>         at this
>         point there are usually no records in the window state.
>
>         Any Ideas?
>
>         I do have a workaround with an evictor, but it seemed to be
>         unnecessarily complicated.
>
>         *We can not use clear()-callback for that, since this state should
>         survive the FIRE_AND_PURGEs in the onElement()-calls.
>
>         Cheers,
>
>         Konstantin
>
>
>         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>         > Hi,
>         > the timers are not actually deleted but the WindowOperator
>         will check
>         > whether there is any window state associated with the window
>         for which
>         > the timer fires. If there is no window state the timer will
>         silently be
>         > ignored.
>         >
>         > Is this a problem for you or did you just want to clarify? If
>         yes, then
>         > we should work on finding a solution.
>         >
>         > Cheers,
>         > Aljoscha
>         >
>         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>         > <[hidden email]
>         <mailto:[hidden email]>
>         <mailto:[hidden email]
>         <mailto:[hidden email]>>> wrote:
>         >
>         >     Hi everyone,
>         >
>         >     I just migrated a streaming Job from 1.0.2 to 1.1.3 and
>         stumbled across
>         >     a problem concerning one of our custom triggers.
>         >
>         >     The trigger basically FIRE_AND_PURGEs multiple times in
>         onElement() and
>         >     the window is PURGEd onProcessingTimeTimer(), but it seems
>         that the all
>         >     registered processing time timers are deleted everytime
>         the window is
>         >     PURGEd.
>         >
>         >     clear() is the default implementation, i.e. no-op.
>         >
>         >     Just wanted to, if this is the expected behavior
>         (processing time timers
>         >     being deleted on PURGE or FIRE_AND_PURGE) from Flink 1.1 on?
>         >
>         >     Cheers,
>         >
>         >     Konstantin
>         >
>         >     --
>         >     Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]>
>         >     <mailto:[hidden email]
>         <mailto:[hidden email]>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>         <tel:0174%203413182>
>         >     <tel:0174%203413182>
>         >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>         Unterföhring
>         >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>         Robert Dahlke
>         >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>         >
>         >
>
>         --
>         Konstantin Knauf * [hidden email]
>         <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>         <tel:0174%203413182>
>         TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Hi Aljoscha,

unfortunately, I think, FLINK-4994 would not solve our issue. What does
"on the very end" mean in case of a GlobalWindow?

FLINK-4369 would fix my workaround though. Is there already a timeline
for Flink 1.2?

Cheers,


Konst

On 10.11.2016 10:19, Aljoscha Krettek wrote:

> Hi Konstantin,
> evicting elements not being evicted is a bug that should be fixed for
> Flink 1.2: https://issues.apache.org/jira/browse/FLINK-4369.
>
> The check about non-existing window state when triggering was introduced
> because otherwise a Trigger could return FIRE and then there would be
> nothing to fire. I guess if we did indeed fire the trigger even with
> non-existing state then some people might wonder why no emission is
> triggered when their trigger returns FIRE. I see your point though, that
> the omitted firing is problematic for some cases.
>
> I think having clear() as proposed
> in https://issues.apache.org/jira/browse/FLINK-4994 would solve your
> case. You were using your own cleanup timer as a workaround because
> clear() is currently also called on PURGE. With clear() only being
> called at the very end this should work, correct?
>
> Cheers,
> Aljoscha
>
> On Wed, 9 Nov 2016 at 19:53 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Aljoscha,
>
>     as it turns out the "workaround" I was thinking was functionally
>     working, but had a so to say memory leak. I was under the impression
>     that evicted elements will be removed from the window state...
>
>     Anyway, I think that this (triggers not being evaluated when the window
>     state is null) turns out to be blocker for us.
>
>     Why is this check done? Since a user can do basically whatever she likes
>     in onProcessingTimeTimer() the comment
>
>     // if we have no state, there is nothing to do
>
>     is, well, just not true in some cases (e.g. state updates in our case).
>
>     Cheers,
>
>     Konstantin
>
>     On 09.11.2016 14:17, Aljoscha Krettek wrote:
>     > Could you go into some detail of why you need to keep the trigger
>     state?
>     >
>     > Just the basics because you probably cannot (should not) talk
>     about your
>     > internal stuff.
>     >
>     > On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
>     > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>     >
>     >     Sounds good Aljoscha.
>     >
>     >     sent from my phone. Plz excuse brevity and tpyos.
>     >     ---
>     >     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>> * +49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >
>     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     Dahlke
>     >
>     >     ---- Aljoscha Krettek schrieb ----
>     >
>     >
>     >     Hi,
>     >     exactly for this case I want to make a change to when
>     >     Trigger.clear() is
>     >     called: https://issues.apache.org/jira/browse/FLINK-4994
>     >
>     >     Right now, clear is called when the window is being garbage
>     >     collected because we passed the allowed lateness (after this,
>     >     nothing will ever be added to a window again) and also when the
>     >     Trigger returns PURGE or FIRE_AND_PURGE.
>     >
>     >     I want to change it to only be called in the former case. We could
>     >     possibly add an onPurge() callback to allow cleaning state on
>     purge
>     >     or require people to put the code that they want to run on
>     PURGE in
>     >     the Trigger method that returns the PURGE.
>     >
>     >     What do you think?
>     >
>     >     Cheers,
>     >     Aljoscha
>     >
>     >     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     >     <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     wrote:
>     >
>     >         Hi Aljoscha,
>     >
>     >         interesting, this explains it. Well, in our case the PURGE
>     in the
>     >         onProcessingTimeTimer is only used to clear
>     KeyValueStates*, and
>     >         at this
>     >         point there are usually no records in the window state.
>     >
>     >         Any Ideas?
>     >
>     >         I do have a workaround with an evictor, but it seemed to be
>     >         unnecessarily complicated.
>     >
>     >         *We can not use clear()-callback for that, since this
>     state should
>     >         survive the FIRE_AND_PURGEs in the onElement()-calls.
>     >
>     >         Cheers,
>     >
>     >         Konstantin
>     >
>     >
>     >         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>     >         > Hi,
>     >         > the timers are not actually deleted but the WindowOperator
>     >         will check
>     >         > whether there is any window state associated with the window
>     >         for which
>     >         > the timer fires. If there is no window state the timer will
>     >         silently be
>     >         > ignored.
>     >         >
>     >         > Is this a problem for you or did you just want to
>     clarify? If
>     >         yes, then
>     >         > we should work on finding a solution.
>     >         >
>     >         > Cheers,
>     >         > Aljoscha
>     >         >
>     >         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>     >         > <[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>     >         >
>     >         >     Hi everyone,
>     >         >
>     >         >     I just migrated a streaming Job from 1.0.2 to 1.1.3 and
>     >         stumbled across
>     >         >     a problem concerning one of our custom triggers.
>     >         >
>     >         >     The trigger basically FIRE_AND_PURGEs multiple times in
>     >         onElement() and
>     >         >     the window is PURGEd onProcessingTimeTimer(), but it
>     seems
>     >         that the all
>     >         >     registered processing time timers are deleted everytime
>     >         the window is
>     >         >     PURGEd.
>     >         >
>     >         >     clear() is the default implementation, i.e. no-op.
>     >         >
>     >         >     Just wanted to, if this is the expected behavior
>     >         (processing time timers
>     >         >     being deleted on PURGE or FIRE_AND_PURGE) from Flink
>     1.1 on?
>     >         >
>     >         >     Cheers,
>     >         >
>     >         >     Konstantin
>     >         >
>     >         >     --
>     >         >     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >         >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>> * +49-174-3413182
>     <tel:0174%203413182>
>     >         <tel:0174%203413182>
>     >         >     <tel:0174%203413182>
>     >         >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     >         Unterföhring
>     >         >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     >         Robert Dahlke
>     >         >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >         >
>     >         >
>     >
>     >         --
>     >         Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>> * +49-174-3413182
>     <tel:0174%203413182>
>     >         <tel:0174%203413182>
>     >         TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     Unterföhring
>     >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     Robert Dahlke
>     >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Hi,
there were some discussions on the ML and it seems that the consensus is to aim for a release this year.

Let me think a bit more and get back to you on the other issues.

Cheers,
Aljoscha 

On Thu, 10 Nov 2016 at 11:47 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

unfortunately, I think, FLINK-4994 would not solve our issue. What does
"on the very end" mean in case of a GlobalWindow?

FLINK-4369 would fix my workaround though. Is there already a timeline
for Flink 1.2?

Cheers,


Konst

On 10.11.2016 10:19, Aljoscha Krettek wrote:
> Hi Konstantin,
> evicting elements not being evicted is a bug that should be fixed for
> Flink 1.2: https://issues.apache.org/jira/browse/FLINK-4369.
>
> The check about non-existing window state when triggering was introduced
> because otherwise a Trigger could return FIRE and then there would be
> nothing to fire. I guess if we did indeed fire the trigger even with
> non-existing state then some people might wonder why no emission is
> triggered when their trigger returns FIRE. I see your point though, that
> the omitted firing is problematic for some cases.
>
> I think having clear() as proposed
> in https://issues.apache.org/jira/browse/FLINK-4994 would solve your
> case. You were using your own cleanup timer as a workaround because
> clear() is currently also called on PURGE. With clear() only being
> called at the very end this should work, correct?
>
> Cheers,
> Aljoscha
>
> On Wed, 9 Nov 2016 at 19:53 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Aljoscha,
>
>     as it turns out the "workaround" I was thinking was functionally
>     working, but had a so to say memory leak. I was under the impression
>     that evicted elements will be removed from the window state...
>
>     Anyway, I think that this (triggers not being evaluated when the window
>     state is null) turns out to be blocker for us.
>
>     Why is this check done? Since a user can do basically whatever she likes
>     in onProcessingTimeTimer() the comment
>
>     // if we have no state, there is nothing to do
>
>     is, well, just not true in some cases (e.g. state updates in our case).
>
>     Cheers,
>
>     Konstantin
>
>     On 09.11.2016 14:17, Aljoscha Krettek wrote:
>     > Could you go into some detail of why you need to keep the trigger
>     state?
>     >
>     > Just the basics because you probably cannot (should not) talk
>     about your
>     > internal stuff.
>     >
>     > On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
>     > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>     >
>     >     Sounds good Aljoscha.
>     >
>     >     sent from my phone. Plz excuse brevity and tpyos.
>     >     ---
>     >     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >
>     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     Dahlke
>     >
>     >     ---- Aljoscha Krettek schrieb ----
>     >
>     >
>     >     Hi,
>     >     exactly for this case I want to make a change to when
>     >     Trigger.clear() is
>     >     called: https://issues.apache.org/jira/browse/FLINK-4994
>     >
>     >     Right now, clear is called when the window is being garbage
>     >     collected because we passed the allowed lateness (after this,
>     >     nothing will ever be added to a window again) and also when the
>     >     Trigger returns PURGE or FIRE_AND_PURGE.
>     >
>     >     I want to change it to only be called in the former case. We could
>     >     possibly add an onPurge() callback to allow cleaning state on
>     purge
>     >     or require people to put the code that they want to run on
>     PURGE in
>     >     the Trigger method that returns the PURGE.
>     >
>     >     What do you think?
>     >
>     >     Cheers,
>     >     Aljoscha
>     >
>     >     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     >     <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     wrote:
>     >
>     >         Hi Aljoscha,
>     >
>     >         interesting, this explains it. Well, in our case the PURGE
>     in the
>     >         onProcessingTimeTimer is only used to clear
>     KeyValueStates*, and
>     >         at this
>     >         point there are usually no records in the window state.
>     >
>     >         Any Ideas?
>     >
>     >         I do have a workaround with an evictor, but it seemed to be
>     >         unnecessarily complicated.
>     >
>     >         *We can not use clear()-callback for that, since this
>     state should
>     >         survive the FIRE_AND_PURGEs in the onElement()-calls.
>     >
>     >         Cheers,
>     >
>     >         Konstantin
>     >
>     >
>     >         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>     >         > Hi,
>     >         > the timers are not actually deleted but the WindowOperator
>     >         will check
>     >         > whether there is any window state associated with the window
>     >         for which
>     >         > the timer fires. If there is no window state the timer will
>     >         silently be
>     >         > ignored.
>     >         >
>     >         > Is this a problem for you or did you just want to
>     clarify? If
>     >         yes, then
>     >         > we should work on finding a solution.
>     >         >
>     >         > Cheers,
>     >         > Aljoscha
>     >         >
>     >         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>     >         > <[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>     >         >
>     >         >     Hi everyone,
>     >         >
>     >         >     I just migrated a streaming Job from 1.0.2 to 1.1.3 and
>     >         stumbled across
>     >         >     a problem concerning one of our custom triggers.
>     >         >
>     >         >     The trigger basically FIRE_AND_PURGEs multiple times in
>     >         onElement() and
>     >         >     the window is PURGEd onProcessingTimeTimer(), but it
>     seems
>     >         that the all
>     >         >     registered processing time timers are deleted everytime
>     >         the window is
>     >         >     PURGEd.
>     >         >
>     >         >     clear() is the default implementation, i.e. no-op.
>     >         >
>     >         >     Just wanted to, if this is the expected behavior
>     >         (processing time timers
>     >         >     being deleted on PURGE or FIRE_AND_PURGE) from Flink
>     1.1 on?
>     >         >
>     >         >     Cheers,
>     >         >
>     >         >     Konstantin
>     >         >
>     >         >     --
>     >         >     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >         >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >         <tel:0174%203413182>
>     >         >     <tel:0174%203413182>
>     >         >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     >         Unterföhring
>     >         >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     >         Robert Dahlke
>     >         >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >         >
>     >         >
>     >
>     >         --
>     >         Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >         <mailto:[hidden email]
>     <mailto:[hidden email]>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >         <tel:0174%203413182>
>     >         TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     Unterföhring
>     >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     Robert Dahlke
>     >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

snntr
Hi Aljoscha,

alright, for the time being I have modified the WindowOperator and built
flink-streaming-java for our team. When you only change the
WindowOperator class, is it safe to just bundle it with the job? I.e.
does this class have precedence over the class in the binary bundle of
flink?

Cheers,

Konstantin

On 10.11.2016 14:48, Aljoscha Krettek wrote:

> Hi,
> there were some discussions on the ML and it seems that the consensus is
> to aim for a release this year.
>
> Let me think a bit more and get back to you on the other issues.
>
> Cheers,
> Aljoscha
>
> On Thu, 10 Nov 2016 at 11:47 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Aljoscha,
>
>     unfortunately, I think, FLINK-4994 would not solve our issue. What does
>     "on the very end" mean in case of a GlobalWindow?
>
>     FLINK-4369 would fix my workaround though. Is there already a timeline
>     for Flink 1.2?
>
>     Cheers,
>
>
>     Konst
>
>     On 10.11.2016 10:19, Aljoscha Krettek wrote:
>     > Hi Konstantin,
>     > evicting elements not being evicted is a bug that should be fixed for
>     > Flink 1.2: https://issues.apache.org/jira/browse/FLINK-4369.
>     >
>     > The check about non-existing window state when triggering was
>     introduced
>     > because otherwise a Trigger could return FIRE and then there would be
>     > nothing to fire. I guess if we did indeed fire the trigger even with
>     > non-existing state then some people might wonder why no emission is
>     > triggered when their trigger returns FIRE. I see your point
>     though, that
>     > the omitted firing is problematic for some cases.
>     >
>     > I think having clear() as proposed
>     > in https://issues.apache.org/jira/browse/FLINK-4994 would solve your
>     > case. You were using your own cleanup timer as a workaround because
>     > clear() is currently also called on PURGE. With clear() only being
>     > called at the very end this should work, correct?
>     >
>     > Cheers,
>     > Aljoscha
>     >
>     > On Wed, 9 Nov 2016 at 19:53 Konstantin Knauf
>     > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>     >
>     >     Hi Aljoscha,
>     >
>     >     as it turns out the "workaround" I was thinking was functionally
>     >     working, but had a so to say memory leak. I was under the
>     impression
>     >     that evicted elements will be removed from the window state...
>     >
>     >     Anyway, I think that this (triggers not being evaluated when
>     the window
>     >     state is null) turns out to be blocker for us.
>     >
>     >     Why is this check done? Since a user can do basically whatever
>     she likes
>     >     in onProcessingTimeTimer() the comment
>     >
>     >     // if we have no state, there is nothing to do
>     >
>     >     is, well, just not true in some cases (e.g. state updates in
>     our case).
>     >
>     >     Cheers,
>     >
>     >     Konstantin
>     >
>     >     On 09.11.2016 14:17, Aljoscha Krettek wrote:
>     >     > Could you go into some detail of why you need to keep the
>     trigger
>     >     state?
>     >     >
>     >     > Just the basics because you probably cannot (should not) talk
>     >     about your
>     >     > internal stuff.
>     >     >
>     >     > On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
>     >     > <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>     >     >
>     >     >     Sounds good Aljoscha.
>     >     >
>     >     >     sent from my phone. Plz excuse brevity and tpyos.
>     >     >     ---
>     >     >     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>> * +49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >     <tel:0174%203413182>
>     >     >
>     >     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     Unterföhring
>     >     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     >     Dahlke
>     >     >
>     >     >     ---- Aljoscha Krettek schrieb ----
>     >     >
>     >     >
>     >     >     Hi,
>     >     >     exactly for this case I want to make a change to when
>     >     >     Trigger.clear() is
>     >     >     called: https://issues.apache.org/jira/browse/FLINK-4994
>     >     >
>     >     >     Right now, clear is called when the window is being garbage
>     >     >     collected because we passed the allowed lateness (after
>     this,
>     >     >     nothing will ever be added to a window again) and also
>     when the
>     >     >     Trigger returns PURGE or FIRE_AND_PURGE.
>     >     >
>     >     >     I want to change it to only be called in the former
>     case. We could
>     >     >     possibly add an onPurge() callback to allow cleaning
>     state on
>     >     purge
>     >     >     or require people to put the code that they want to run on
>     >     PURGE in
>     >     >     the Trigger method that returns the PURGE.
>     >     >
>     >     >     What do you think?
>     >     >
>     >     >     Cheers,
>     >     >     Aljoscha
>     >     >
>     >     >     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     >     >     <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>     >     >     wrote:
>     >     >
>     >     >         Hi Aljoscha,
>     >     >
>     >     >         interesting, this explains it. Well, in our case the
>     PURGE
>     >     in the
>     >     >         onProcessingTimeTimer is only used to clear
>     >     KeyValueStates*, and
>     >     >         at this
>     >     >         point there are usually no records in the window state.
>     >     >
>     >     >         Any Ideas?
>     >     >
>     >     >         I do have a workaround with an evictor, but it
>     seemed to be
>     >     >         unnecessarily complicated.
>     >     >
>     >     >         *We can not use clear()-callback for that, since this
>     >     state should
>     >     >         survive the FIRE_AND_PURGEs in the onElement()-calls.
>     >     >
>     >     >         Cheers,
>     >     >
>     >     >         Konstantin
>     >     >
>     >     >
>     >     >         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>     >     >         > Hi,
>     >     >         > the timers are not actually deleted but the
>     WindowOperator
>     >     >         will check
>     >     >         > whether there is any window state associated with
>     the window
>     >     >         for which
>     >     >         > the timer fires. If there is no window state the
>     timer will
>     >     >         silently be
>     >     >         > ignored.
>     >     >         >
>     >     >         > Is this a problem for you or did you just want to
>     >     clarify? If
>     >     >         yes, then
>     >     >         > we should work on finding a solution.
>     >     >         >
>     >     >         > Cheers,
>     >     >         > Aljoscha
>     >     >         >
>     >     >         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>     >     >         > <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>>> wrote:
>     >     >         >
>     >     >         >     Hi everyone,
>     >     >         >
>     >     >         >     I just migrated a streaming Job from 1.0.2 to
>     1.1.3 and
>     >     >         stumbled across
>     >     >         >     a problem concerning one of our custom triggers.
>     >     >         >
>     >     >         >     The trigger basically FIRE_AND_PURGEs multiple
>     times in
>     >     >         onElement() and
>     >     >         >     the window is PURGEd onProcessingTimeTimer(),
>     but it
>     >     seems
>     >     >         that the all
>     >     >         >     registered processing time timers are deleted
>     everytime
>     >     >         the window is
>     >     >         >     PURGEd.
>     >     >         >
>     >     >         >     clear() is the default implementation, i.e. no-op.
>     >     >         >
>     >     >         >     Just wanted to, if this is the expected behavior
>     >     >         (processing time timers
>     >     >         >     being deleted on PURGE or FIRE_AND_PURGE) from
>     Flink
>     >     1.1 on?
>     >     >         >
>     >     >         >     Cheers,
>     >     >         >
>     >     >         >     Konstantin
>     >     >         >
>     >     >         >     --
>     >     >         >     Konstantin Knauf *
>     [hidden email] <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     >         >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> * +49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >         <tel:0174%203413182>
>     >     >         >     <tel:0174%203413182>
>     >     >         >     TNG Technology Consulting GmbH, Betastr. 13a,
>     85774
>     >     >         Unterföhring
>     >     >         >     Geschäftsführer: Henrik Klagges, Christoph
>     Stock, Dr.
>     >     >         Robert Dahlke
>     >     >         >     Sitz: Unterföhring * Amtsgericht München * HRB
>     135082
>     >     >         >
>     >     >         >
>     >     >
>     >     >         --
>     >     >         Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>> * +49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >         <tel:0174%203413182>
>     >     >         TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     >     Unterföhring
>     >     >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     >     Robert Dahlke
>     >     >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >     >
>     >
>     >     --
>     >     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>> * +49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     Dahlke
>     >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Hi,
I think the jar files in the lib folder are checked first so shipping the WindowOperator with the job should not work.

Cheers,
Aljoscha 

On Thu, 10 Nov 2016 at 17:48 Konstantin Knauf <[hidden email]> wrote:
Hi Aljoscha,

alright, for the time being I have modified the WindowOperator and built
flink-streaming-java for our team. When you only change the
WindowOperator class, is it safe to just bundle it with the job? I.e.
does this class have precedence over the class in the binary bundle of
flink?

Cheers,

Konstantin

On 10.11.2016 14:48, Aljoscha Krettek wrote:
> Hi,
> there were some discussions on the ML and it seems that the consensus is
> to aim for a release this year.
>
> Let me think a bit more and get back to you on the other issues.
>
> Cheers,
> Aljoscha
>
> On Thu, 10 Nov 2016 at 11:47 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi Aljoscha,
>
>     unfortunately, I think, FLINK-4994 would not solve our issue. What does
>     "on the very end" mean in case of a GlobalWindow?
>
>     FLINK-4369 would fix my workaround though. Is there already a timeline
>     for Flink 1.2?
>
>     Cheers,
>
>
>     Konst
>
>     On 10.11.2016 10:19, Aljoscha Krettek wrote:
>     > Hi Konstantin,
>     > evicting elements not being evicted is a bug that should be fixed for
>     > Flink 1.2: https://issues.apache.org/jira/browse/FLINK-4369.
>     >
>     > The check about non-existing window state when triggering was
>     introduced
>     > because otherwise a Trigger could return FIRE and then there would be
>     > nothing to fire. I guess if we did indeed fire the trigger even with
>     > non-existing state then some people might wonder why no emission is
>     > triggered when their trigger returns FIRE. I see your point
>     though, that
>     > the omitted firing is problematic for some cases.
>     >
>     > I think having clear() as proposed
>     > in https://issues.apache.org/jira/browse/FLINK-4994 would solve your
>     > case. You were using your own cleanup timer as a workaround because
>     > clear() is currently also called on PURGE. With clear() only being
>     > called at the very end this should work, correct?
>     >
>     > Cheers,
>     > Aljoscha
>     >
>     > On Wed, 9 Nov 2016 at 19:53 Konstantin Knauf
>     > <[hidden email]
>     <mailto:[hidden email]>
>     <mailto:[hidden email]
>     <mailto:[hidden email]>>> wrote:
>     >
>     >     Hi Aljoscha,
>     >
>     >     as it turns out the "workaround" I was thinking was functionally
>     >     working, but had a so to say memory leak. I was under the
>     impression
>     >     that evicted elements will be removed from the window state...
>     >
>     >     Anyway, I think that this (triggers not being evaluated when
>     the window
>     >     state is null) turns out to be blocker for us.
>     >
>     >     Why is this check done? Since a user can do basically whatever
>     she likes
>     >     in onProcessingTimeTimer() the comment
>     >
>     >     // if we have no state, there is nothing to do
>     >
>     >     is, well, just not true in some cases (e.g. state updates in
>     our case).
>     >
>     >     Cheers,
>     >
>     >     Konstantin
>     >
>     >     On 09.11.2016 14:17, Aljoscha Krettek wrote:
>     >     > Could you go into some detail of why you need to keep the
>     trigger
>     >     state?
>     >     >
>     >     > Just the basics because you probably cannot (should not) talk
>     >     about your
>     >     > internal stuff.
>     >     >
>     >     > On Wed, 9 Nov 2016 at 13:16 Konstantin Knauf
>     >     > <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> wrote:
>     >     >
>     >     >     Sounds good Aljoscha.
>     >     >
>     >     >     sent from my phone. Plz excuse brevity and tpyos.
>     >     >     ---
>     >     >     Konstantin Knauf *[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >     <tel:0174%203413182>
>     >     >
>     >     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     Unterföhring
>     >     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     >     Dahlke
>     >     >
>     >     >     ---- Aljoscha Krettek schrieb ----
>     >     >
>     >     >
>     >     >     Hi,
>     >     >     exactly for this case I want to make a change to when
>     >     >     Trigger.clear() is
>     >     >     called: https://issues.apache.org/jira/browse/FLINK-4994
>     >     >
>     >     >     Right now, clear is called when the window is being garbage
>     >     >     collected because we passed the allowed lateness (after
>     this,
>     >     >     nothing will ever be added to a window again) and also
>     when the
>     >     >     Trigger returns PURGE or FIRE_AND_PURGE.
>     >     >
>     >     >     I want to change it to only be called in the former
>     case. We could
>     >     >     possibly add an onPurge() callback to allow cleaning
>     state on
>     >     purge
>     >     >     or require people to put the code that they want to run on
>     >     PURGE in
>     >     >     the Trigger method that returns the PURGE.
>     >     >
>     >     >     What do you think?
>     >     >
>     >     >     Cheers,
>     >     >     Aljoscha
>     >     >
>     >     >     On Tue, 8 Nov 2016 at 18:46 Konstantin Knauf
>     >     >     <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>>
>     >     >     wrote:
>     >     >
>     >     >         Hi Aljoscha,
>     >     >
>     >     >         interesting, this explains it. Well, in our case the
>     PURGE
>     >     in the
>     >     >         onProcessingTimeTimer is only used to clear
>     >     KeyValueStates*, and
>     >     >         at this
>     >     >         point there are usually no records in the window state.
>     >     >
>     >     >         Any Ideas?
>     >     >
>     >     >         I do have a workaround with an evictor, but it
>     seemed to be
>     >     >         unnecessarily complicated.
>     >     >
>     >     >         *We can not use clear()-callback for that, since this
>     >     state should
>     >     >         survive the FIRE_AND_PURGEs in the onElement()-calls.
>     >     >
>     >     >         Cheers,
>     >     >
>     >     >         Konstantin
>     >     >
>     >     >
>     >     >         On 08.11.2016 18:31, Aljoscha Krettek wrote:
>     >     >         > Hi,
>     >     >         > the timers are not actually deleted but the
>     WindowOperator
>     >     >         will check
>     >     >         > whether there is any window state associated with
>     the window
>     >     >         for which
>     >     >         > the timer fires. If there is no window state the
>     timer will
>     >     >         silently be
>     >     >         > ignored.
>     >     >         >
>     >     >         > Is this a problem for you or did you just want to
>     >     clarify? If
>     >     >         yes, then
>     >     >         > we should work on finding a solution.
>     >     >         >
>     >     >         > Cheers,
>     >     >         > Aljoscha
>     >     >         >
>     >     >         > On Tue, 8 Nov 2016 at 18:18 Konstantin Knauf
>     >     >         > <[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>>> wrote:
>     >     >         >
>     >     >         >     Hi everyone,
>     >     >         >
>     >     >         >     I just migrated a streaming Job from 1.0.2 to
>     1.1.3 and
>     >     >         stumbled across
>     >     >         >     a problem concerning one of our custom triggers.
>     >     >         >
>     >     >         >     The trigger basically FIRE_AND_PURGEs multiple
>     times in
>     >     >         onElement() and
>     >     >         >     the window is PURGEd onProcessingTimeTimer(),
>     but it
>     >     seems
>     >     >         that the all
>     >     >         >     registered processing time timers are deleted
>     everytime
>     >     >         the window is
>     >     >         >     PURGEd.
>     >     >         >
>     >     >         >     clear() is the default implementation, i.e. no-op.
>     >     >         >
>     >     >         >     Just wanted to, if this is the expected behavior
>     >     >         (processing time timers
>     >     >         >     being deleted on PURGE or FIRE_AND_PURGE) from
>     Flink
>     >     1.1 on?
>     >     >         >
>     >     >         >     Cheers,
>     >     >         >
>     >     >         >     Konstantin
>     >     >         >
>     >     >         >     --
>     >     >         >     Konstantin Knauf *
>     [hidden email] <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>
>     >     >         >     <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >         <tel:0174%203413182>
>     >     >         >     <tel:0174%203413182>
>     >     >         >     TNG Technology Consulting GmbH, Betastr. 13a,
>     85774
>     >     >         Unterföhring
>     >     >         >     Geschäftsführer: Henrik Klagges, Christoph
>     Stock, Dr.
>     >     >         Robert Dahlke
>     >     >         >     Sitz: Unterföhring * Amtsgericht München * HRB
>     135082
>     >     >         >
>     >     >         >
>     >     >
>     >     >         --
>     >     >         Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>
>     >     >         <mailto:[hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     >         <tel:0174%203413182>
>     >     >         TNG Technology Consulting GmbH, Betastr. 13a, 85774
>     >     Unterföhring
>     >     >         Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
>     >     Robert Dahlke
>     >     >         Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >     >
>     >
>     >     --
>     >     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]>
>     >     <mailto:[hidden email]
>     <mailto:[hidden email]>> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     >     <tel:0174%203413182>
>     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
>     Dahlke
>     >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>     >
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
>     <tel:0174%203413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

--
Konstantin Knauf * [hidden email] * <a href="tel:0174%203413182" value="+491743413182" class="gmail_msg" target="_blank">+49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Ufuk Celebi
On 11 November 2016 at 18:19:30, Aljoscha Krettek ([hidden email]) wrote:
> Hi,
> I think the jar files in the lib folder are checked first so shipping the
> WindowOperator with the job should not work.

The WindowOperator is instantiated on the client side and shipped as user code to the cluster, so it should be OK to only have the modified WindowOperator code locally.

That should work, Aljoscha? Or am I overlooking something here.

– Ufuk


Reply | Threaded
Open this post in threaded view
|

Re: Window PURGE Behaviour 1.0.2 vs 1.1.3

Aljoscha Krettek
Not sure, because there's going to be two WindwoOperator classes on the class path when deserialising this on the cluster.

On Mon, 14 Nov 2016 at 10:07 Ufuk Celebi <[hidden email]> wrote:
On 11 November 2016 at 18:19:30, Aljoscha Krettek ([hidden email]) wrote:
> Hi,
> I think the jar files in the lib folder are checked first so shipping the
> WindowOperator with the job should not work.

The WindowOperator is instantiated on the client side and shipped as user code to the cluster, so it should be OK to only have the modified WindowOperator code locally.

That should work, Aljoscha? Or am I overlooking something here.

– Ufuk