Dynamic StreamingFileSink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic StreamingFileSink

Sidney Feiner
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the input. 
For example, redirect the row to a different directory based on the first 2 characters of the input, so if the content I'm writing starts with "XX" then write it to a target /path/to/dir/XX, but if the content starts with "YY" then write it to target /path/to/dir/YY. 

I've tried implementing a DynamicFileSink that internally holds a map of every combination of 2 letters that it meets, and every first time it meets them, it creates a StreamingFileSink and invokes it's invoke method.

Obviously, that didn't work because a StreamingFileSink should be initiated completely differently. 

I'm guessing I could implement this completely by myself, but I feel it'd be a waste if there was some way that could utilize the existing StreamingFileSink.

BTW, this is part of an existing architecture where every pipeline needs an actual Sink, so it isn't possible for me to manipulate the datastream directly, use keyBy(2 first letters) and then write it's output to a file per key.

Any help would be much appreciated :)

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic StreamingFileSink

Rafi Aroch
Hi Sidney,


Rafi


On Sat, Dec 26, 2020 at 11:48 PM Sidney Feiner <[hidden email]> wrote:
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the input. 
For example, redirect the row to a different directory based on the first 2 characters of the input, so if the content I'm writing starts with "XX" then write it to a target /path/to/dir/XX, but if the content starts with "YY" then write it to target /path/to/dir/YY. 

I've tried implementing a DynamicFileSink that internally holds a map of every combination of 2 letters that it meets, and every first time it meets them, it creates a StreamingFileSink and invokes it's invoke method.

Obviously, that didn't work because a StreamingFileSink should be initiated completely differently. 

I'm guessing I could implement this completely by myself, but I feel it'd be a waste if there was some way that could utilize the existing StreamingFileSink.

BTW, this is part of an existing architecture where every pipeline needs an actual Sink, so it isn't possible for me to manipulate the datastream directly, use keyBy(2 first letters) and then write it's output to a file per key.

Any help would be much appreciated :)

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature

Reply | Threaded
Open this post in threaded view
|

Re: Dynamic StreamingFileSink

Sidney Feiner-2
If anybody is interested, I've implemented a StreamingFileSink with dynamic paths:


Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Rafi Aroch <[hidden email]>
Sent: Sunday, December 27, 2020 8:25 AM
To: Sidney Feiner <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Dynamic StreamingFileSink
 
Hi Sidney,


Rafi


On Sat, Dec 26, 2020 at 11:48 PM Sidney Feiner <[hidden email]> wrote:
Hey,
I would like to create a dynamic StreamingFileSink for my Streaming pipeline.
By dynamic, I mean that it will write to a different directory based on the input. 
For example, redirect the row to a different directory based on the first 2 characters of the input, so if the content I'm writing starts with "XX" then write it to a target /path/to/dir/XX, but if the content starts with "YY" then write it to target /path/to/dir/YY. 

I've tried implementing a DynamicFileSink that internally holds a map of every combination of 2 letters that it meets, and every first time it meets them, it creates a StreamingFileSink and invokes it's invoke method.

Obviously, that didn't work because a StreamingFileSink should be initiated completely differently. 

I'm guessing I could implement this completely by myself, but I feel it'd be a waste if there was some way that could utilize the existing StreamingFileSink.

BTW, this is part of an existing architecture where every pipeline needs an actual Sink, so it isn't possible for me to manipulate the datastream directly, use keyBy(2 first letters) and then write it's output to a file per key.

Any help would be much appreciated :)

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature