Writing groups of Windows to files

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing groups of Windows to files

Niels Basjes
Hi,

I have the following situation:
- I have click stream data from a website that contains a session id.
- I then use something like EventTimeSessionWindows to group these events per session into a WindowedStream

So I essentially end up with a stream of "finished sessions"

So far I am able to do this fine.

I then want to put these "finished sessions" in (parquet) files where I want to have files with the sessions that ENDED (or the timeout of the gap occurred) in a similar timeframe.

So these files should be created every 5 minutes (or so) and contain all events of the sessions that ended/timedout in the specified 5 minutes.

What I ran into is that the WindowsStream doesn't accept a sink so simply creating a BucketingSink and write the data doesn't work.

Can anyone please give me some pointers on how to do this correctly?

Thanks.

--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Writing groups of Windows to files

Nico Kruber
Looks like you are missing a window *function* that processes the window.
From [1] :

stream
       .keyBy(...)          <-  keyed versus non-keyed windows
       .window(...)         <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"

so for example:

events.keyBy("carId")
        .window(EventTimeSessionWindows.withGap(Time.seconds(15)))
        .apply(...)
        .addSink(...)


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html

On Friday, 30 June 2017 13:29:17 CEST Niels Basjes wrote:

> Hi,
>
> I have the following situation:
> - I have click stream data from a website that contains a session id.
> - I then use something like EventTimeSessionWindows to group these events
> per session into a WindowedStream
>
> So I essentially end up with a stream of "finished sessions"
>
> So far I am able to do this fine.
>
> I then want to put these "finished sessions" in (parquet) files where I
> want to have files with the sessions that ENDED (or the timeout of the gap
> occurred) in a similar timeframe.
>
> So these files should be created every 5 minutes (or so) and contain all
> events of the sessions that ended/timedout in the specified 5 minutes.
>
> What I ran into is that the WindowsStream doesn't accept a sink so simply
> creating a BucketingSink and write the data doesn't work.
>
> Can anyone please give me some pointers on how to do this correctly?
>
> Thanks.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Writing groups of Windows to files

Fabian Hueske-2
In reply to this post by Niels Basjes
Hi Niels,

If I understand your use case correctly, you'd like to hold back all events of a session until it ends/timesout and then write all events out.
So, instead of aggregating per session (the common use case), you'd just like to collect the event.

I would implement a simple WindowFunction that just forwards all events that it receives from the iterator. Conceptually, the window will just collect the events and emit them when the session ended/timedout.
Then you can add BucketingSink which writes out the events. I'm not sure if the BucketingSInk supports buckets based on event-time though. Maybe you would need to adapt it a bit to guarantee that all rows of the same session are written to the same file.
Alternatively, the WindowFunction could also emit one large record which is a List or Array of events belonging to the same session.

Best,
Fabian

2017-06-30 13:29 GMT+02:00 Niels Basjes <[hidden email]>:
Hi,

I have the following situation:
- I have click stream data from a website that contains a session id.
- I then use something like EventTimeSessionWindows to group these events per session into a WindowedStream

So I essentially end up with a stream of "finished sessions"

So far I am able to do this fine.

I then want to put these "finished sessions" in (parquet) files where I want to have files with the sessions that ENDED (or the timeout of the gap occurred) in a similar timeframe.

So these files should be created every 5 minutes (or so) and contain all events of the sessions that ended/timedout in the specified 5 minutes.

What I ran into is that the WindowsStream doesn't accept a sink so simply creating a BucketingSink and write the data doesn't work.

Can anyone please give me some pointers on how to do this correctly?

Thanks.

--
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply | Threaded
Open this post in threaded view
|

Re: Writing groups of Windows to files

Niels Basjes
Hi Fabian,

On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <[hidden email]> wrote:
If I understand your use case correctly, you'd like to hold back all events of a session until it ends/timesout and then write all events out.
So, instead of aggregating per session (the common use case), you'd just like to collect the event.

Yes, and I want to write the completed sessions into files. No aggregation or filtering at all.
The idea is that our DataScience guys who want to analyze sessions have a much easier task of knowing for certain that they have 'a set of complete sessions'.
 
I would implement a simple WindowFunction that just forwards all events that it receives from the iterator. Conceptually, the window will just collect the events and emit them when the session ended/timedout.
Then you can add BucketingSink which writes out the events. I'm not sure if the BucketingSInk supports buckets based on event-time though. Maybe you would need to adapt it a bit to guarantee that all rows of the same session are written to the same file.
Alternatively, the WindowFunction could also emit one large record which is a List or Array of events belonging to the same session.

That last one was the idea I had.
Have a window function that keeps the Window until finished, then output that with the eventtime of the 'end of the session' and use the bucketing sink to write those to disk.

The problem (in my mind) that I have with this is that a single session with a LOT of events would bring the system to a halt because it can trigger OOM events.

How should I handle those?

Niels
Reply | Threaded
Open this post in threaded view
|

Re: Writing groups of Windows to files

Fabian Hueske-2
You are right. Building a large record might result in an OOME.
But I think, that would also happen with a regular SessionWindow, RocksDBStatebackend, and a WindowFunction that immediately ships the records it receives from the Iterable.
As far as I know, a SessionWindow stores all elements in an internal a ListState. When before iterating over a ListState, the RocksDBStateBackend completely deserializes the list into an ArrayList, i.e., all records of a session would be on the heap already.

If this is a situation that you have to avoid, I think the only way to do this is to implement the sessionization yourself with a ProcessFunction and MapState (keyed on timestamp). This would be a non-trivial task though.

In case the SessionWindow + WindowFunction is OK for you (or you implemented your own sessionization logic, e.g., in a ProcessFunction), you could just forward the elements to a modified BucketingSink.
Your version of the BucketingSink would need to ensure that files are not closed between checkpoints, i.e., only when a checkpoint barrier is received, it may be closed. Right now, files are closed on timer and/or file size.
Since all records of a session are emitted by a single WIndowFunction call, these records won't be interrupted by a barrier. Hence, you'll have a "consistent" state for all windows when a checkpoint is triggered.

I'm afraid, I'm not aware of a simpler solution for this use case.

Hope it helps, Fabian

2017-07-04 11:24 GMT+02:00 Niels Basjes <[hidden email]>:
Hi Fabian,

On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <[hidden email]> wrote:
If I understand your use case correctly, you'd like to hold back all events of a session until it ends/timesout and then write all events out.
So, instead of aggregating per session (the common use case), you'd just like to collect the event.

Yes, and I want to write the completed sessions into files. No aggregation or filtering at all.
The idea is that our DataScience guys who want to analyze sessions have a much easier task of knowing for certain that they have 'a set of complete sessions'.
 
I would implement a simple WindowFunction that just forwards all events that it receives from the iterator. Conceptually, the window will just collect the events and emit them when the session ended/timedout.
Then you can add BucketingSink which writes out the events. I'm not sure if the BucketingSInk supports buckets based on event-time though. Maybe you would need to adapt it a bit to guarantee that all rows of the same session are written to the same file.
Alternatively, the WindowFunction could also emit one large record which is a List or Array of events belonging to the same session.

That last one was the idea I had.
Have a window function that keeps the Window until finished, then output that with the eventtime of the 'end of the session' and use the bucketing sink to write those to disk.

The problem (in my mind) that I have with this is that a single session with a LOT of events would bring the system to a halt because it can trigger OOM events.

How should I handle those?

Niels

Reply | Threaded
Open this post in threaded view
|

Re: Writing groups of Windows to files

Niels Basjes
I think I understand.

Since the entrie session must fit into memory anyway I'm going to try to create a new datastructure which simply contains the 'entire session' and simply use a Window/BucketingSink construct to ship them to files.
I do need to ensure noone can OOM the system by capping the maximum number of events in a session to a value that is only reached by robots (for which the whole idea os sessions is bogus anyway).

Thanks for the tips.

Niels

On Tue, Jul 4, 2017 at 12:07 PM, Fabian Hueske <[hidden email]> wrote:
You are right. Building a large record might result in an OOME.
But I think, that would also happen with a regular SessionWindow, RocksDBStatebackend, and a WindowFunction that immediately ships the records it receives from the Iterable.
As far as I know, a SessionWindow stores all elements in an internal a ListState. When before iterating over a ListState, the RocksDBStateBackend completely deserializes the list into an ArrayList, i.e., all records of a session would be on the heap already.

If this is a situation that you have to avoid, I think the only way to do this is to implement the sessionization yourself with a ProcessFunction and MapState (keyed on timestamp). This would be a non-trivial task though.

In case the SessionWindow + WindowFunction is OK for you (or you implemented your own sessionization logic, e.g., in a ProcessFunction), you could just forward the elements to a modified BucketingSink.
Your version of the BucketingSink would need to ensure that files are not closed between checkpoints, i.e., only when a checkpoint barrier is received, it may be closed. Right now, files are closed on timer and/or file size.
Since all records of a session are emitted by a single WIndowFunction call, these records won't be interrupted by a barrier. Hence, you'll have a "consistent" state for all windows when a checkpoint is triggered.

I'm afraid, I'm not aware of a simpler solution for this use case.

Hope it helps, Fabian

2017-07-04 11:24 GMT+02:00 Niels Basjes <[hidden email]>:
Hi Fabian,

On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <[hidden email]> wrote:
If I understand your use case correctly, you'd like to hold back all events of a session until it ends/timesout and then write all events out.
So, instead of aggregating per session (the common use case), you'd just like to collect the event.

Yes, and I want to write the completed sessions into files. No aggregation or filtering at all.
The idea is that our DataScience guys who want to analyze sessions have a much easier task of knowing for certain that they have 'a set of complete sessions'.
 
I would implement a simple WindowFunction that just forwards all events that it receives from the iterator. Conceptually, the window will just collect the events and emit them when the session ended/timedout.
Then you can add BucketingSink which writes out the events. I'm not sure if the BucketingSInk supports buckets based on event-time though. Maybe you would need to adapt it a bit to guarantee that all rows of the same session are written to the same file.
Alternatively, the WindowFunction could also emit one large record which is a List or Array of events belonging to the same session.

That last one was the idea I had.
Have a window function that keeps the Window until finished, then output that with the eventtime of the 'end of the session' and use the bucketing sink to write those to disk.

The problem (in my mind) that I have with this is that a single session with a LOT of events would bring the system to a halt because it can trigger OOM events.

How should I handle those?

Niels




--
Best regards / Met vriendelijke groeten,

Niels Basjes