Cannot pass objects with null-valued fields to the next operator

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot pass objects with null-valued fields to the next operator

Jack Huang-2
Hi all,

It seems like flink does not allow passing case class objects with null-valued fields to the next operators. I am getting the following error message: 
Caused by: java.lang.RuntimeException: Could not forward element to next operator
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
           at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
           at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
           at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
           at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
           ... 9 more
This error goes away when I force all objects to not have fields with null values. However, null is a valid value in my use case. Is there a way to make it work? I am using flink-1.1.1.


Thanks,
Jack
Reply | Threaded
Open this post in threaded view
|

Re: Cannot pass objects with null-valued fields to the next operator

Stephan Ewen
Hi!

Null is indeed not supported for some basic data types (tuples / case classes).

Can you use Option for nullable fields?

Stephan


On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang <[hidden email]> wrote:
Hi all,

It seems like flink does not allow passing case class objects with null-valued fields to the next operators. I am getting the following error message: 
Caused by: java.lang.RuntimeException: Could not forward element to next operator
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
           at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
           at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
           at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
           at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
           ... 9 more
This error goes away when I force all objects to not have fields with null values. However, null is a valid value in my use case. Is there a way to make it work? I am using flink-1.1.1.


Thanks,
Jack

Reply | Threaded
Open this post in threaded view
|

Re: Cannot pass objects with null-valued fields to the next operator

Jack Huang-2
Hi Stephan,

In the end I decided to specify a default value (e.g. empty string) when a field is null.

On Mon, Aug 29, 2016 at 11:25 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

Null is indeed not supported for some basic data types (tuples / case classes).

Can you use Option for nullable fields?

Stephan


On Mon, Aug 29, 2016 at 8:04 PM, Jack Huang <[hidden email]> wrote:
Hi all,

It seems like flink does not allow passing case class objects with null-valued fields to the next operators. I am getting the following error message: 
Caused by: java.lang.RuntimeException: Could not forward element to next operator
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:399)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:381)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
           at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
           at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:340)
           at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
           at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:239)
           at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:67)
           at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:32)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:90)
           at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:30)
           at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:371)
           ... 9 more
This error goes away when I force all objects to not have fields with null values. However, null is a valid value in my use case. Is there a way to make it work? I am using flink-1.1.1.


Thanks,
Jack