Re: "stepless" sliding windows?
Posted by
Danny Chan-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/stepless-sliding-windows-tp38822p38873.html
The SLIDING window always triggers as of each step, what do you mean by "stepless" ?
whoops.. as usual, posting led me to find some answers myself. Does this make sense given my requirements?
Thanks!
private class MyWindowAssigner(val windowSize: Time) : WindowAssigner<Record, TimeWindow>() {
private val trigger = CountTrigger.of<TimeWindow>(1) as Trigger<Record TimeWindow>
override fun assignWindows(
element: Record,
timestamp: Long,
context: WindowAssignerContext
): MutableCollection<TimeWindow> {
return mutableListOf(TimeWindow(timestamp - windowSize.toMilliseconds(), timestamp))
}
override fun getDefaultTrigger(env: StreamExecutionEnvironment?): Trigger<Record, TimeWindow> {
return trigger
}
override fun getWindowSerializer(executionConfig: ExecutionConfig?): TypeSerializer<TimeWindow> {
return TimeWindow.Serializer()
}
override fun isEventTime(): Boolean {
return true
}
}
Hey folks!
I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.
OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?
Thanks!
-0xe1a