How to best create a bounded session window ?

Posted by Vishal Santoshi on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/How-to-best-create-a-bounded-session-window-tp16634.html

I am implementing a bounded session window but I require to short circuit the session if the session length ( in count of events or time ) go beyond a configured limit , a very reasonable scenario ( bot etc ) . I am using the approach as listed. I am not sure though if the Window itself is being terminated and if that is even feasible. Any other approach or advise ?  

public class BoundedEventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
long maxSessionTime;

ValueState<Boolean> doneState;
private final ValueStateDescriptor<Boolean> cleanupStateDescriptor =
new ValueStateDescriptor<>("done", Boolean.class );

private BoundedEventTimeTrigger(long maxSessionTime) {
this.maxSessionTime = maxSessionTime;
}

/**
* Creates an event-time trigger that fires once the watermark passes the end of the window.
* <p>
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
*/
public static BoundedEventTimeTrigger create(long maxSessionLengh) {
return new BoundedEventTimeTrigger(maxSessionLengh);
}

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if(cleanupState!=null && cleanupState.value()!=null && cleanupState.value()) {
return TriggerResult.CONTINUE;
}
if(timestamp - window.getStart() > maxSessionTime){
System.out.println(new Date(timestamp) + "\t" + new Date(window.getStart()));
try {
doneState = ctx.getPartitionedState(cleanupStateDescriptor);
doneState.update(true);
return TriggerResult.FIRE_AND_PURGE;
} catch (IOException e) {
throw new RuntimeException("Failed to update state", e);
}
}

if (window.maxTimestamp() <= ctx.getCurrentWatermark() ) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerEventTimeTimer(window.maxTimestamp());
}

@Override
public String toString() {
return "EventTimeTrigger()";
}
}