How to customize trigger for Count Time Window

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to customize trigger for Count Time Window

Soheil Pourbafrani
I want to have a time window to trigger data processing in two following condition:
1 - The window has 3 messages
2- Or any number of message (less than 3) is in the window and it reaches a timeout

I know someone should extend Trigger class:

public static class MyWindowTrigger <W extends Window> extends Trigger<Object, W> {

@Override
public TriggerResult onElement(Object o, long l, W w, TriggerContext triggerContext) throws Exception {

}

@Override
public TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE ;
}

@Override
public TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE ;
}

@Override
public void clear(W w, TriggerContext triggerContext) throws Exception {

}
But I don't know how should I check the number of messages in the window and set a timeout. Can someone help?
Reply | Threaded
Open this post in threaded view
|

Re: How to customize trigger for Count Time Window

Rong Rong
Hi Soheil,

I don't think just overriding the window trigger function is sufficient, since your logic effectively changes the how elements are assigned to a window. 
Based on a quick scan I think your use case might be able to reuse the DynamicGapSessionWIndow [1], where you will have to create a customized session timeout extractor based on how many messages is currently in a window. and you should be able to reuse the trigger.

Thanks.
Rong 



On Sat, Jul 14, 2018 at 10:08 PM Soheil Pourbafrani <[hidden email]> wrote:
I want to have a time window to trigger data processing in two following condition:
1 - The window has 3 messages
2- Or any number of message (less than 3) is in the window and it reaches a timeout

I know someone should extend Trigger class:

public static class MyWindowTrigger <W extends Window> extends Trigger<Object, W> {

@Override
public TriggerResult onElement(Object o, long l, W w, TriggerContext triggerContext) throws Exception {

}

@Override
public TriggerResult onProcessingTime(long l, W w, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE ;
}

@Override
public TriggerResult onEventTime(long l, W w, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE ;
}

@Override
public void clear(W w, TriggerContext triggerContext) throws Exception {

}
But I don't know how should I check the number of messages in the window and set a timeout. Can someone help?