(For reference, I'm in 1.0.3)
I have a job that looks like this:
DataStream<A> input = ...
input
.map(MapFunction<A,B>...)
.addSink(...);
input
.map(MapFunction<A,C>...)
.addSink(...);
If I do not call enableObjectReuse() it works, if I do call enableObjectReuse() it throws:
java.lang.ClassCastException: B cannot be cast to A
when attempting to process the second MapFunction<A,C>
It looks like in the input operator is calling Output<StreamRecord<A>>.collect, which will pass the value to the first MapFunction and then the second MapFunction. However, the first map function calls StreamRecord<>.replace() which mutates the stored value
to the output of the function. When Output<StreamRecord<A>>.collect passes this to the next MapFunction it is now as if the two MapFunctions are in serial and not parallel.
I looked into JIRA and didn't see an issue that looked exactly like this. Is this known?
-Bart
Free forum by Nabble | Edit this page |