Hi,
I have the following situation: - I have click stream data from a website that contains a session id. - I then use something like EventTimeSessionWindows to group these events per session into a WindowedStream So I essentially end up with a stream of "finished sessions" So far I am able to do this fine. I then want to put these "finished sessions" in (parquet) files where I want to have files with the sessions that ENDED (or the timeout of the gap occurred) in a similar timeframe. So these files should be created every 5 minutes (or so) and contain all events of the sessions that ended/timedout in the specified 5 minutes. What I ran into is that the WindowsStream doesn't accept a sink so simply creating a BucketingSink and write the data doesn't work. Can anyone please give me some pointers on how to do this correctly? Thanks. Best regards / Met vriendelijke groeten,
Niels Basjes |
Looks like you are missing a window *function* that processes the window.
From [1] : stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function" so for example: events.keyBy("carId") .window(EventTimeSessionWindows.withGap(Time.seconds(15))) .apply(...) .addSink(...) Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html On Friday, 30 June 2017 13:29:17 CEST Niels Basjes wrote: > Hi, > > I have the following situation: > - I have click stream data from a website that contains a session id. > - I then use something like EventTimeSessionWindows to group these events > per session into a WindowedStream > > So I essentially end up with a stream of "finished sessions" > > So far I am able to do this fine. > > I then want to put these "finished sessions" in (parquet) files where I > want to have files with the sessions that ENDED (or the timeout of the gap > occurred) in a similar timeframe. > > So these files should be created every 5 minutes (or so) and contain all > events of the sessions that ended/timedout in the specified 5 minutes. > > What I ran into is that the WindowsStream doesn't accept a sink so simply > creating a BucketingSink and write the data doesn't work. > > Can anyone please give me some pointers on how to do this correctly? > > Thanks. signature.asc (201 bytes) Download Attachment |
In reply to this post by Niels Basjes
Hi Niels, If I understand your use case correctly, you'd like to hold back all events of a session until it ends/timesout and then write all events out.2017-06-30 13:29 GMT+02:00 Niels Basjes <[hidden email]>:
|
Hi Fabian,
On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <[hidden email]> wrote:
Yes, and I want to write the completed sessions into files. No aggregation or filtering at all. The idea is that our DataScience guys who want to analyze sessions have a much easier task of knowing for certain that they have 'a set of complete sessions'.
That last one was the idea I had. Have a window function that keeps the Window until finished, then output that with the eventtime of the 'end of the session' and use the bucketing sink to write those to disk. The problem (in my mind) that I have with this is that a single session with a LOT of events would bring the system to a halt because it can trigger OOM events. How should I handle those? Niels |
You are right. Building a large record might result in an OOME. As far as I know, a SessionWindow stores all elements in an internal a ListState. When before iterating over a ListState, the RocksDBStateBackend completely deserializes the list into an ArrayList, i.e., all records of a session would be on the heap already.But I think, that would also happen with a regular SessionWindow, RocksDBStatebackend, and a WindowFunction that immediately ships the records it receives from the Iterable. If this is a situation that you have to avoid, I think the only way to do this is to implement the sessionization yourself with a ProcessFunction and MapState (keyed on timestamp). This would be a non-trivial task though. Your version of the BucketingSink would need to ensure that files are not closed between checkpoints, i.e., only when a checkpoint barrier is received, it may be closed. Right now, files are closed on timer and/or file size. Since all records of a session are emitted by a single WIndowFunction call, these records won't be interrupted by a barrier. Hence, you'll have a "consistent" state for all windows when a checkpoint is triggered. I'm afraid, I'm not aware of a simpler solution for this use case. Hope it helps, Fabian 2017-07-04 11:24 GMT+02:00 Niels Basjes <[hidden email]>:
|
I think I understand. Since the entrie session must fit into memory anyway I'm going to try to create a new datastructure which simply contains the 'entire session' and simply use a Window/BucketingSink construct to ship them to files. I do need to ensure noone can OOM the system by capping the maximum number of events in a session to a value that is only reached by robots (for which the whole idea os sessions is bogus anyway). Thanks for the tips. Niels On Tue, Jul 4, 2017 at 12:07 PM, Fabian Hueske <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Free forum by Nabble | Edit this page |