Cannot use WindowedStream.fold with EventTimeSessionWindows

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot use WindowedStream.fold with EventTimeSessionWindows

Jack Huang-2
Hi all,

I want to window a series of events using SessionWindow and use fold function to incrementally aggregate the result.
events
    .keyBy(_.id)
    .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
    .fold(new Session)(eventFolder)
However I get
java.lang.UnsupportedOperationException: Fold cannot be used with a merging WindowAssigner.

Does anyone have a workaround?



Thanks,
Jack



Reply | Threaded
Open this post in threaded view
|

Re: Cannot use WindowedStream.fold with EventTimeSessionWindows

Till Rohrmann
Hi Jack,

the problem with session windows and a fold operation, which is an incremental operation, is that you don't have a way to combine partial folds when merigng windows. As a workaround you have to specify a window function where you get an iterator over all your window elements and then perform the fold manually in the window function.

Cheers,
Till

On Wed, Aug 17, 2016 at 3:21 AM, Jack Huang <[hidden email]> wrote:
Hi all,

I want to window a series of events using SessionWindow and use fold function to incrementally aggregate the result.
events
    .keyBy(_.id)
    .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
    .fold(new Session)(eventFolder)
However I get
java.lang.UnsupportedOperationException: Fold cannot be used with a merging WindowAssigner.

Does anyone have a workaround?



Thanks,
Jack




Reply | Threaded
Open this post in threaded view
|

Re: Cannot use WindowedStream.fold with EventTimeSessionWindows

Jack Huang-2
Hi Till,

The session I am dealing with does not have a reliable "end-of-session" event. It could stop sending events all of sudden or it could keep sending events forever. I need to be able to determine when a session expire due to inactivity or to kill off a session if it lives longer than it should. Therefore I need to perform incremental aggregation.

If I can assume ascending timestamp for the incoming events, would there be a workaround? 

Thanks,
Jack



On Wed, Aug 17, 2016 at 2:17 AM, Till Rohrmann <[hidden email]> wrote:
Hi Jack,

the problem with session windows and a fold operation, which is an incremental operation, is that you don't have a way to combine partial folds when merigng windows. As a workaround you have to specify a window function where you get an iterator over all your window elements and then perform the fold manually in the window function.

Cheers,
Till

On Wed, Aug 17, 2016 at 3:21 AM, Jack Huang <[hidden email]> wrote:
Hi all,

I want to window a series of events using SessionWindow and use fold function to incrementally aggregate the result.
events
    .keyBy(_.id)
    .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
    .fold(new Session)(eventFolder)
However I get
java.lang.UnsupportedOperationException: Fold cannot be used with a merging WindowAssigner.

Does anyone have a workaround?



Thanks,
Jack