NullPointerException with Avro Serializer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

NullPointerException with Avro Serializer

Kien Truong
Hi,

After upgrading to Flink 1.4, we encounter this exception

Caused by: java.lang.NullPointerException: in com.viettel.big4g.avro.LteSession in long null of long in field tmsi of com.viettel.big4g.avro.LteSession
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)


It seems Flink attempts to use the reflection writer instead of the specific writer for this schema. This is wrong, because our LteSession is an Avro object, and should use the specific writer.

Best regards,
Kien

Sent from TypeApp
Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException with Avro Serializer

Kien Truong
Upon further investigation, we found out that the reason:

* The cluster was started on YARN with the hadoop classpath, which includes Avro. Therefore, Avro's SpecificRecord class was loaded using sun.misc.Launcher$AppClassLoader


* Our LteSession class was submitted with the application jar, and loaded with the child-first classloader

* Flink check if LteSession is assignable to SpecificRecord, which fails.

* Flink fall back to Reflection-based avro writer, which throws NPE on null field.

If we change the classloader to parent-first, everything is ok. Now the question is why the default doesn't work for us.

Best regards,
Kien

Sent from TypeApp
On Dec 20, 2017, at 14:09, Kien Truong <[hidden email]> wrote:
Hi,

After upgrading to Flink 1.4, we encounter this exception

Caused by: java.lang.NullPointerException: in com.viettel.big4g.avro.LteSession in long null of long in field tmsi of com.viettel.big4g.avro.LteSession
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)


It seems Flink attempts to use the reflection writer instead of the specific writer for this schema. This is wrong, because our LteSession is an Avro object, and should use the specific writer.

Best regards,
Kien

Sent from TypeApp
Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException with Avro Serializer

Kien Truong
It turn out that our flink branch is out-of-date. Sorry for all the noise. :)

Regards,
Kien

Sent from TypeApp
On Dec 20, 2017, at 16:42, Kien Truong <[hidden email]> wrote:
Upon further investigation, we found out that the reason:

* The cluster was started on YARN with the hadoop classpath, which includes Avro. Therefore, Avro's SpecificRecord class was loaded using sun.misc.Launcher$AppClassLoader


* Our LteSession class was submitted with the application jar, and loaded with the child-first classloader

* Flink check if LteSession is assignable to SpecificRecord, which fails.

* Flink fall back to Reflection-based avro writer, which throws NPE on null field.

If we change the classloader to parent-first, everything is ok. Now the question is why the default doesn't work for us.

Best regards,
Kien

Sent from TypeApp
On Dec 20, 2017, at 14:09, Kien Truong < [hidden email]> wrote:
Hi,

After upgrading to Flink 1.4, we encounter this exception

Caused by: java.lang.NullPointerException: in com.viettel.big4g.avro.LteSession in long null of long in field tmsi of com.viettel.big4g.avro.LteSession
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)


It seems Flink attempts to use the reflection writer instead of the specific writer for this schema. This is wrong, because our LteSession is an Avro object, and should use the specific writer.

Best regards,
Kien

Sent from TypeApp
Reply | Threaded
Open this post in threaded view
|

Re: NullPointerException with Avro Serializer

Aljoscha Krettek
Phew, thanks for letting us know! ðŸ˜ƒ

And yes, there were some problems with Avro and class loading but I was hoping that we got them all before Flink 1.4.0.

Best,
Aljoscha

On 20. Dec 2017, at 10:54, Kien Truong <[hidden email]> wrote:

It turn out that our flink branch is out-of-date. Sorry for all the noise. :)

Regards,
Kien

Sent from TypeApp
On Dec 20, 2017, at 16:42, Kien Truong <[hidden email]> wrote:
Upon further investigation, we found out that the reason:

* The cluster was started on YARN with the hadoop classpath, which includes Avro. Therefore, Avro's SpecificRecord class was loaded using sun.misc.Launcher$AppClassLoader


* Our LteSession class was submitted with the application jar, and loaded with the child-first classloader

* Flink check if LteSession is assignable to SpecificRecord, which fails.

* Flink fall back to Reflection-based avro writer, which throws NPE on null field.

If we change the classloader to parent-first, everything is ok. Now the question is why the default doesn't work for us.

Best regards,
Kien

Sent from TypeApp
On Dec 20, 2017, at 14:09, Kien Truong < [hidden email]> wrote:
Hi,

After upgrading to Flink 1.4, we encounter this exception

Caused by: java.lang.NullPointerException: in com.viettel.big4g.avro.LteSession in long null of long in field tmsi of com.viettel.big4g.avro.LteSession
at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)


It seems Flink attempts to use the reflection writer instead of the specific writer for this schema. This is wrong, because our LteSession is an Avro object, and should use the specific writer.

Best regards,
Kien

Sent from TypeApp