"stepless" sliding windows?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

"stepless" sliding windows?

Alex Cruise
Hey folks!

I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.

OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: "stepless" sliding windows?

Alex Cruise
whoops.. as usual, posting led me to find some answers myself. Does this make sense given my requirements?

Thanks!
private class MyWindowAssigner(val windowSize: Time) : WindowAssigner<Record, TimeWindow>() {
private val trigger = CountTrigger.of<TimeWindow>(1) as Trigger<Record TimeWindow>

override fun assignWindows(
element: Record,
timestamp: Long,
context: WindowAssignerContext
): MutableCollection<TimeWindow> {
return mutableListOf(TimeWindow(timestamp - windowSize.toMilliseconds(), timestamp))
}

override fun getDefaultTrigger(env: StreamExecutionEnvironment?): Trigger<Record, TimeWindow> {
return trigger
}

override fun getWindowSerializer(executionConfig: ExecutionConfig?): TypeSerializer<TimeWindow> {
return TimeWindow.Serializer()
}

override fun isEventTime(): Boolean {
return true
}
}

On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise <[hidden email]> wrote:
Hey folks!

I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.

OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: "stepless" sliding windows?

Danny Chan-2
The SLIDING window always triggers as of each step, what do you mean by "stepless" ?

Alex Cruise <[hidden email]> 于2020年10月21日周三 上午1:52写道:
whoops.. as usual, posting led me to find some answers myself. Does this make sense given my requirements?

Thanks!
private class MyWindowAssigner(val windowSize: Time) : WindowAssigner<Record, TimeWindow>() {
private val trigger = CountTrigger.of<TimeWindow>(1) as Trigger<Record TimeWindow>

override fun assignWindows(
element: Record,
timestamp: Long,
context: WindowAssignerContext
): MutableCollection<TimeWindow> {
return mutableListOf(TimeWindow(timestamp - windowSize.toMilliseconds(), timestamp))
}

override fun getDefaultTrigger(env: StreamExecutionEnvironment?): Trigger<Record, TimeWindow> {
return trigger
}

override fun getWindowSerializer(executionConfig: ExecutionConfig?): TypeSerializer<TimeWindow> {
return TimeWindow.Serializer()
}

override fun isEventTime(): Boolean {
return true
}
}

On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise <[hidden email]> wrote:
Hey folks!

I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.

OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?

Thanks!

-0xe1a
Reply | Threaded
Open this post in threaded view
|

Re: "stepless" sliding windows?

Jacob Sevart
I think the issue is you have to specify a time interval for "step." It would be nice to consider the preceding N minutes as of every message. You can somewhat approximate that using a very small step.

On Thu, Oct 22, 2020 at 2:29 AM Danny Chan <[hidden email]> wrote:
The SLIDING window always triggers as of each step, what do you mean by "stepless" ?

Alex Cruise <[hidden email]> 于2020年10月21日周三 上午1:52写道:
whoops.. as usual, posting led me to find some answers myself. Does this make sense given my requirements?

Thanks!
private class MyWindowAssigner(val windowSize: Time) : WindowAssigner<Record, TimeWindow>() {
private val trigger = CountTrigger.of<TimeWindow>(1) as Trigger<Record TimeWindow>

override fun assignWindows(
element: Record,
timestamp: Long,
context: WindowAssignerContext
): MutableCollection<TimeWindow> {
return mutableListOf(TimeWindow(timestamp - windowSize.toMilliseconds(), timestamp))
}

override fun getDefaultTrigger(env: StreamExecutionEnvironment?): Trigger<Record, TimeWindow> {
return trigger
}

override fun getWindowSerializer(executionConfig: ExecutionConfig?): TypeSerializer<TimeWindow> {
return TimeWindow.Serializer()
}

override fun isEventTime(): Boolean {
return true
}
}

On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise <[hidden email]> wrote:
Hey folks!

I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.

OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?

Thanks!

-0xe1a


--
Jacob Sevart
Software Engineer, Safety
Reply | Threaded
Open this post in threaded view
|

Re: "stepless" sliding windows?

Alex Cruise
On Thu, Oct 22, 2020 at 11:08 AM Jacob Sevart <[hidden email]> wrote:
I think the issue is you have to specify a time interval for "step." It would be nice to consider the preceding N minutes as of every message. You can somewhat approximate that using a very small step.

Indeed, I want the window to slide continuously, not based on a time interval. I think with the code I posted earlier I'd be creating too many windows, and double-counting events. I might need to go with global + evictor, since I want to age out each event.

-0xe1a


On Thu, Oct 22, 2020 at 2:29 AM Danny Chan <[hidden email]> wrote:
The SLIDING window always triggers as of each step, what do you mean by "stepless" ?

Alex Cruise <[hidden email]> 于2020年10月21日周三 上午1:52写道:
whoops.. as usual, posting led me to find some answers myself. Does this make sense given my requirements?

Thanks!
private class MyWindowAssigner(val windowSize: Time) : WindowAssigner<Record, TimeWindow>() {
private val trigger = CountTrigger.of<TimeWindow>(1) as Trigger<Record TimeWindow>

override fun assignWindows(
element: Record,
timestamp: Long,
context: WindowAssignerContext
): MutableCollection<TimeWindow> {
return mutableListOf(TimeWindow(timestamp - windowSize.toMilliseconds(), timestamp))
}

override fun getDefaultTrigger(env: StreamExecutionEnvironment?): Trigger<Record, TimeWindow> {
return trigger
}

override fun getWindowSerializer(executionConfig: ExecutionConfig?): TypeSerializer<TimeWindow> {
return TimeWindow.Serializer()
}

override fun isEventTime(): Boolean {
return true
}
}

On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise <[hidden email]> wrote:
Hey folks!

I have an application that wants to use "stepless" sliding windows, i.e. we produce aggregates on every event. The windows need to be of a fixed size, but to have their start and end times update continuously, and I'd like to trigger on every event. Is this a bad idea? I've googled and read the docs extensively and haven't been able to identify built-in functionality or examples that map cleanly to my requirements.

OK, I just found DeltaTrigger, which looks promising... Does it make sense to write a WindowAssigner that makes a new Window on every event, allocation rates aside?

Thanks!

-0xe1a


--
Jacob Sevart
Software Engineer, Safety