When does Trigger.clear() get called?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

When does Trigger.clear() get called?

Andrew Danks
Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew



Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Hequn Cheng
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that clear() is called only when timer is triggered, i.e, called at the end of time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <[hidden email]> wrote:
Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew



Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Fabian Hueske-2
Hi Andrew,

The PURGE action of a window removes the window state (i.e., the collected events or computed aggregate) but the window meta data including the Trigger remain.
The Trigger.close() method is called, when the winodw is completely (i.e., all meta data) discarded. This happens, when the time (wallclock time for processing time or watermark for event time windows) exceeds the window's end timestamp.

Best, Fabian

Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <[hidden email]>:
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that clear() is called only when timer is triggered, i.e, called at the end of time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <[hidden email]> wrote:
Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew



Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Averell
Hello Fabian,

So could I assume the followings?

1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.

And one related question: for keyed streams, if I know that some keys would
never have new events anymore, should/could I remove those streams
corresponding to those keys so that I can save some memory allocated to the
metadata?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Hequn Cheng
Hi Averell,

> 1. Neither PURGE nor clear() removes the States (so the States must be explicitly cleared by the user).
Both PURGE and clear() remove state. The PURGE action removes the window state, i.e. the aggregate value. The clear() removes the window meta data including state in Trigger.

> 2. When an event for a window arrives after PURGE has been called, it is still be processed, and is treated as the first event of that window.
In most cases, the answer is yes. However, there is a chance that the event is not treated as the first one by the trigger, since PURGE clears the window state but the window meta data including the Trigger remain.

>  if I know that some keys would never have new events anymore, should/could I remove those streams corresponding to those keys
Yes. I think we can return FIRE_AND_PURGE.

Best, Hequn



On Sun, Oct 14, 2018 at 7:30 AM Averell <[hidden email]> wrote:
Hello Fabian,

So could I assume the followings?

1. Neither PURGE nor clear() removes the States (so the States must be
explicitly cleared by the user).
2. When an event for a window arrives after PURGE has been called, it is
still be processed, and is treated as the first event of that window.

And one related question: for keyed streams, if I know that some keys would
never have new events anymore, should/could I remove those streams
corresponding to those keys so that I can save some memory allocated to the
metadata?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Averell
Hello Hequn,

Thanks for the answers.
Regarding question no.2, I am now clear.
Regarding question no.1, does your answer apply to those custom states as
well? This concern of mine came from Flink's implementation of CountTrigger,
in which a custom state is being cleared explicitly in Trigger.clear():

/ public void clear(W window, TriggerContext ctx) throws Exception {
                ctx.getPartitionedState(stateDesc).clear();
        }
/

My 3rd question was for ordinary, non-windowed keyed streams, where I don't
see in Flink's document any mention of using Trigger, so how can I clear
those streams?

Thank you very much for your help.
Regards,
Averell
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Fabian Hueske-2
Hi,

Re Q1: The main purpose of the Trigger.clean() method is to remove all custom state of the Trigger. State must be explicitly removed, otherwise the program leaks memory.
Re Q3: If you are using a keyed stream, you need to manually clean up the state by calling State.clear(). If you are using a ProcessFunction, you can do that in processElement() or register a timer and clean up in onTimer().

Best, Fabian

Am So., 14. Okt. 2018 um 06:06 Uhr schrieb Averell <[hidden email]>:
Hello Hequn,

Thanks for the answers.
Regarding question no.2, I am now clear.
Regarding question no.1, does your answer apply to those custom states as
well? This concern of mine came from Flink's implementation of CountTrigger,
in which a custom state is being cleared explicitly in Trigger.clear():

/       public void clear(W window, TriggerContext ctx) throws Exception {
                ctx.getPartitionedState(stateDesc).clear();
        }
/

My 3rd question was for ordinary, non-windowed keyed streams, where I don't
see in Flink's document any mention of using Trigger, so how can I clear
those streams?

Thank you very much for your help.
Regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Averell
Thank you Fabian.

All my doubts are cleared now.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Andrew Danks
In reply to this post by Fabian Hueske-2
Hi Fabian & Hequn,

Thank you for your responses. I am just responding now as I was out of office for the last few days

You mentioned that clear() is called when the time exceeds the window’s end timestamp. For my application I am using a GlobalWindow on a keyed stream -- would clear() get called at all in this case or should I be calling it manually?


Andrew

On Oct 12, 2018, at 12:48 AM, Fabian Hueske <[hidden email]> wrote:

Hi Andrew,

The PURGE action of a window removes the window state (i.e., the collected events or computed aggregate) but the window meta data including the Trigger remain.
The Trigger.close() method is called, when the winodw is completely (i.e., all meta data) discarded. This happens, when the time (wallclock time for processing time or watermark for event time windows) exceeds the window's end timestamp.

Best, Fabian

Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <[hidden email]>:
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that clear() is called only when timer is triggered, i.e, called at the end of time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <[hidden email]> wrote:
Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew




Reply | Threaded
Open this post in threaded view
|

Re: When does Trigger.clear() get called?

Hequn Cheng
Hi Andrew,

You should call it manually, as the global window does not have a natural end.

Best, Hequn

On Wed, Oct 17, 2018 at 2:47 AM Andrew Danks <[hidden email]> wrote:
Hi Fabian & Hequn,

Thank you for your responses. I am just responding now as I was out of office for the last few days

You mentioned that clear() is called when the time exceeds the window’s end timestamp. For my application I am using a GlobalWindow on a keyed stream -- would clear() get called at all in this case or should I be calling it manually?


Andrew

On Oct 12, 2018, at 12:48 AM, Fabian Hueske <[hidden email]> wrote:

Hi Andrew,

The PURGE action of a window removes the window state (i.e., the collected events or computed aggregate) but the window meta data including the Trigger remain.
The Trigger.close() method is called, when the winodw is completely (i.e., all meta data) discarded. This happens, when the time (wallclock time for processing time or watermark for event time windows) exceeds the window's end timestamp.

Best, Fabian

Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng <[hidden email]>:
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that clear() is called only when timer is triggered, i.e, called at the end of time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks <[hidden email]> wrote:
Hello,

I see that the clear() function is implemented for various types of Triggers in the Flink API. For example:

I am working on a custom Trigger for my application and have implemented clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called when I expect. The source code says that is called "when a window is purged”[1] but when my Trigger emits a PURGE this function never seems to get called. I am on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it gets called

Thanks!
Andrew