enableObjectReuse and multiple downstream operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

enableObjectReuse and multiple downstream operators

Bart Wyatt

(For reference, I'm in 1.0.3) 


I have a job that looks like this:


DataStream<A> input = ...

input

    .map(MapFunction<A,B>...)

    .addSink(...);


input

    .map(MapFunction<A,C>...)

    ​.addSink(...);


If I do not call enableObjectReuse() it works, if I do call enableObjectReuse() it throws:

java.lang.ClassCastException: B cannot be cast to A

when attempting to process the second MapFunction<A,C>


It looks like in the input operator is calling Output<StreamRecord<A>>.collect, which will pass the value to the first MapFunction and then the second MapFunction.  However, the first map function calls StreamRecord<>.replace() which mutates the stored value to the output of the function.  When Output<StreamRecord<A>>.collect passes this to the next MapFunction it is now as if the two MapFunctions are in serial and not parallel.


I looked into JIRA and didn't see an issue that looked exactly like this.  Is this known?


-Bart






This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.
Reply | Threaded
Open this post in threaded view
|

Re: enableObjectReuse and multiple downstream operators

Aljoscha Krettek
Hi Bart,
yup, this is a bug. AFAIK it is now known, would you like to open the Jira issue for it? If not, I can also open one.

The problem is in the interaction of how chaining works in the streaming API with object reuse. As you said, with how it is implemented it serially calls the two map functions which it shouldn't because they mutate the records.

-Aljoscha

On Wed, 25 May 2016 at 17:12 Bart Wyatt <[hidden email]> wrote:

(For reference, I'm in 1.0.3) 


I have a job that looks like this:


DataStream<A> input = ...

input

    .map(MapFunction<A,B>...)

    .addSink(...);


input

    .map(MapFunction<A,C>...)

    ​.addSink(...);


If I do not call enableObjectReuse() it works, if I do call enableObjectReuse() it throws:

java.lang.ClassCastException: B cannot be cast to A

when attempting to process the second MapFunction<A,C>


It looks like in the input operator is calling Output<StreamRecord<A>>.collect, which will pass the value to the first MapFunction and then the second MapFunction.  However, the first map function calls StreamRecord<>.replace() which mutates the stored value to the output of the function.  When Output<StreamRecord<A>>.collect passes this to the next MapFunction it is now as if the two MapFunctions are in serial and not parallel.


I looked into JIRA and didn't see an issue that looked exactly like this.  Is this known?


-Bart






This e-mail may contain CONFIDENTIAL AND PROPRIETARY INFORMATION and/or PRIVILEGED AND CONFIDENTIAL COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to any party outside of the recipient's organization without the prior written consent of the sender. If you have received this e-mail in error please notify the sender immediately by telephone or reply e-mail and destroy the original message without making a copy. Deep Silver, Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions and viruses in e-mail attachments.