How to clear registered timers for a merged window?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to clear registered timers for a merged window?

Yan Zhou [FDS Science] ­
Hi,

I am implementing a merge-able trigger, and having a problem in clearing the registered timers for a merged window (a window has been merged into the merging result). For my implementation, the trigger registers multiple timers for each element at Trigger#onElement(). State is used to keep track of the registered event time, so that timer can be removed at Trigger#clear() later.

However, clearing the registered timers in this way doesn't work if the window has been merged. The state of origin window is removed during merging. Method AbstractHeapMergingState#mergeNamespaces() removes the state of merged window. I think the ContinuousEventTimeTrigger shipped with flink would have same issue.

My question is is there a way to keep the state for a merged window? One way I can think of is to implement a custom heap state that add the state back in AbstractHeapMergingState#mergeState() method. Or is there a way to clear the timers without using state? Can I twist the internal timer's source code to expose a method to remove all timers for a specified window?

Please advise and thank you for your help.

Best
Yan
Reply | Threaded
Open this post in threaded view
|

Re: How to clear registered timers for a merged window?

Stefan Richter
Hi,

I think that it is currently not possible to delete timers that did not trigger, because currently some of the data structures used for timers do not support random deletes efficiently. For the second part of the question about keeping the state of merged windows, I added Aljoscha in CC who might provide more information about the topic.

Best,
Stefan

> Am 25.09.2017 um 22:59 schrieb Yan Zhou [FDS Science] ­ <[hidden email]>:
>
> Hi,
>
> I am implementing a merge-able trigger, and having a problem in clearing the registered timers for a merged window (a window has been merged into the merging result). For my implementation, the trigger registers multiple timers for each element at Trigger#onElement(). State is used to keep track of the registered event time, so that timer can be removed at Trigger#clear() later.
>
> However, clearing the registered timers in this way doesn't work if the window has been merged. The state of origin window is removed during merging. Method AbstractHeapMergingState#mergeNamespaces() removes the state of merged window. I think the ContinuousEventTimeTrigger shipped with flink would have same issue.
>
> My question is is there a way to keep the state for a merged window? One way I can think of is to implement a custom heap state that add the state back in AbstractHeapMergingState#mergeState() method. Or is there a way to clear the timers without using state? Can I twist the internal timer's source code to expose a method to remove all timers for a specified window?
>
> Please advise and thank you for your help.
>
> Best
> Yan

Reply | Threaded
Open this post in threaded view
|

Re: How to clear registered timers for a merged window?

Aljoscha Krettek
Hi,

I think you should be able to use state for cleaning up your timers in Trigger.clear(). For this, you have to make sure to not clean up the state in Trigger.onMerge() and instead remove it in Trigger.clear().

I'm not sure whether this will be possible for your use case, though.

Best,
Aljoscha

> On 26. Sep 2017, at 11:17, Stefan Richter <[hidden email]> wrote:
>
> Hi,
>
> I think that it is currently not possible to delete timers that did not trigger, because currently some of the data structures used for timers do not support random deletes efficiently. For the second part of the question about keeping the state of merged windows, I added Aljoscha in CC who might provide more information about the topic.
>
> Best,
> Stefan
>
>> Am 25.09.2017 um 22:59 schrieb Yan Zhou [FDS Science] ­ <[hidden email]>:
>>
>> Hi,
>>
>> I am implementing a merge-able trigger, and having a problem in clearing the registered timers for a merged window (a window has been merged into the merging result). For my implementation, the trigger registers multiple timers for each element at Trigger#onElement(). State is used to keep track of the registered event time, so that timer can be removed at Trigger#clear() later.
>>
>> However, clearing the registered timers in this way doesn't work if the window has been merged. The state of origin window is removed during merging. Method AbstractHeapMergingState#mergeNamespaces() removes the state of merged window. I think the ContinuousEventTimeTrigger shipped with flink would have same issue.
>>
>> My question is is there a way to keep the state for a merged window? One way I can think of is to implement a custom heap state that add the state back in AbstractHeapMergingState#mergeState() method. Or is there a way to clear the timers without using state? Can I twist the internal timer's source code to expose a method to remove all timers for a specified window?
>>
>> Please advise and thank you for your help.
>>
>> Best
>> Yan
>

Reply | Threaded
Open this post in threaded view
|

Re: How to clear registered timers for a merged window?

Yan Zhou [FDS Science] ­
Hi Aljoscha,
Thanks for the information. In my case I didn't clean up the state explicitly in Trigger.onMerge(). Only OnMergeContext.mergePartitionedState(stateDesc) is called within my implementation, with the intention to copy the states from merged windows and re-register the timers. 

However, the method "cut-n-paste" the states from merged windows to result window. To my understanding, the method should just copy the state but leave the state cleanup to be done within Trigger.clear() method. It's because user might want to release resources based on the data within the state. To my case, it's the timers within the state. 

Below is my implementation:

public class MyTrigger<T, W extends Window> extends Trigger<T, W> {

    ...
    ...
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
        reRegister(ctx);
    }
    
    public void clear(W window, TriggerContext ctx) throws Exception {
    // clean timers, which are retrieved from state
    Iterable<Long> allRegisteredEventTimes = ctx.getPartitionedState(stateDesc).get();
        deleteTimers(allRegisteredEventTimes);
        state.clear();
    }
    ...
    ...
}

Best
Yan


On Tue, Sep 26, 2017 at 8:28 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think you should be able to use state for cleaning up your timers in Trigger.clear(). For this, you have to make sure to not clean up the state in Trigger.onMerge() and instead remove it in Trigger.clear().

I'm not sure whether this will be possible for your use case, though.

Best,
Aljoscha

> On 26. Sep 2017, at 11:17, Stefan Richter <[hidden email]> wrote:
>
> Hi,
>
> I think that it is currently not possible to delete timers that did not trigger, because currently some of the data structures used for timers do not support random deletes efficiently. For the second part of the question about keeping the state of merged windows, I added Aljoscha in CC who might provide more information about the topic.
>
> Best,
> Stefan
>
>> Am 25.09.2017 um 22:59 schrieb Yan Zhou [FDS Science] ­ <[hidden email]>:
>>
>> Hi,
>>
>> I am implementing a merge-able trigger, and having a problem in clearing the registered timers for a merged window (a window has been merged into the merging result). For my implementation, the trigger registers multiple timers for each element at Trigger#onElement(). State is used to keep track of the registered event time, so that timer can be removed at Trigger#clear() later.
>>
>> However, clearing the registered timers in this way doesn't work if the window has been merged. The state of origin window is removed during merging. Method AbstractHeapMergingState#mergeNamespaces() removes the state of merged window. I think the ContinuousEventTimeTrigger shipped with flink would have same issue.
>>
>> My question is is there a way to keep the state for a merged window? One way I can think of is to implement a custom heap state that add the state back in AbstractHeapMergingState#mergeState() method. Or is there a way to clear the timers without using state? Can I twist the internal timer's source code to expose a method to remove all timers for a specified window?
>>
>> Please advise and thank you for your help.
>>
>> Best
>> Yan
>