Discarding bad data in Stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Discarding bad data in Stream

Niclas Hedhman
Hi again,

something that I don't find (easily) in the documentation is what the recommended method is to discard data from the stream.

On one hand, I could always use flatMap(), even if it is "per message" since that allows me to return zero or one objects.
DataStream<MyType> stream =
env.addSource( source )
.flatMap( new MyFunction() )

But that seems a bit misleading, as the casual observer will get the idea that MyFunction 'branches' out, but it doesn't.

The other "obvious" choice is to return null and follow with a filter...
DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( Objects::nonNull )
BUT, that doesn't work with Java 8 method references like above, so I have to create my own filter to get the type information correct to Flink;

DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( new DiscardNullFilter<>() )

And in my opinion, that ends up looking ugly as the streams/pipeline (not used to terminology yet) quickly have many transformations and branches, and having a null check after each seems to put the burden of knowledge in the wrong spot ("Can this function return null?")

Throwing an exception is shutting down the entire stream, which seems overly aggressive for many data related discards.
 
Any other choices?

Cheers
--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java
Reply | Threaded
Open this post in threaded view
|

Re: Discarding bad data in Stream

Fabian Hueske-2
Hi Niclas,

I'd either add a Filter to directly discard bad records. That should make the behavior explicit.
If you need to do complex transformations that you don't want to do twice, the FlatMap approach would be the most efficient.
If you'd like to keep the bad records, you can implement a ProcessFunction and add a side output [1] that collects bad records.

Hope this helps,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html

2018-02-19 10:29 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi again,

something that I don't find (easily) in the documentation is what the recommended method is to discard data from the stream.

On one hand, I could always use flatMap(), even if it is "per message" since that allows me to return zero or one objects.
DataStream<MyType> stream =
env.addSource( source )
.flatMap( new MyFunction() )

But that seems a bit misleading, as the casual observer will get the idea that MyFunction 'branches' out, but it doesn't.

The other "obvious" choice is to return null and follow with a filter...
DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( Objects::nonNull )
BUT, that doesn't work with Java 8 method references like above, so I have to create my own filter to get the type information correct to Flink;

DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( new DiscardNullFilter<>() )

And in my opinion, that ends up looking ugly as the streams/pipeline (not used to terminology yet) quickly have many transformations and branches, and having a null check after each seems to put the burden of knowledge in the wrong spot ("Can this function return null?")

Throwing an exception is shutting down the entire stream, which seems overly aggressive for many data related discards.
 
Any other choices?

Cheers
--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Reply | Threaded
Open this post in threaded view
|

Re: Discarding bad data in Stream

Niclas Hedhman

Thanks Fabian,

I have seen Side Outputs and OutputTags but not fully understood the mechanics yet. In my case, I don't need to keep bad records... And I think I will end up with flatMap() after all, it just becomes a internal documentation issue to provide relevant information...

Thanks for your response.
Niclas

On Mon, Feb 19, 2018 at 8:46 PM, Fabian Hueske <[hidden email]> wrote:
Hi Niclas,

I'd either add a Filter to directly discard bad records. That should make the behavior explicit.
If you need to do complex transformations that you don't want to do twice, the FlatMap approach would be the most efficient.
If you'd like to keep the bad records, you can implement a ProcessFunction and add a side output [1] that collects bad records.

Hope this helps,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html

2018-02-19 10:29 GMT+01:00 Niclas Hedhman <[hidden email]>:
Hi again,

something that I don't find (easily) in the documentation is what the recommended method is to discard data from the stream.

On one hand, I could always use flatMap(), even if it is "per message" since that allows me to return zero or one objects.
DataStream<MyType> stream =
env.addSource( source )
.flatMap( new MyFunction() )

But that seems a bit misleading, as the casual observer will get the idea that MyFunction 'branches' out, but it doesn't.

The other "obvious" choice is to return null and follow with a filter...
DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( Objects::nonNull )
BUT, that doesn't work with Java 8 method references like above, so I have to create my own filter to get the type information correct to Flink;

DataStream<MyType> stream =
env.addSource( source )
.map( new MyFunction() )
.filter( new DiscardNullFilter<>() )

And in my opinion, that ends up looking ugly as the streams/pipeline (not used to terminology yet) quickly have many transformations and branches, and having a null check after each seems to put the burden of knowledge in the wrong spot ("Can this function return null?")

Throwing an exception is shutting down the entire stream, which seems overly aggressive for many data related discards.
 
Any other choices?

Cheers
--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java




--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java