Flink and Avro for state serialization

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink and Avro for state serialization

Yashwant Ganti
Hello all,

We are running some Flink jobs - we wrote the job in Beam but are using the Flink Runner and are seeing the following error when restarting the job from a Savepoint

Caused by: java.io.InvalidClassException: com.xxx.xxx; local class incompatible: stream classdesc serialVersionUID = -5544933308624767500, local class serialVersionUID = -7822035950742261370

Here is what happened
  • The Type in question is an Avro Type - so we have a `PCollection<OurAvroType>` in the job. 
  • We updated the Avro schema and by default the generated class will have a new serialVersionUID in Avro (the serialVersionUIDs above line up with the ones in the generated Avro classes)
  • We did not use any custom serializers for this type so I believe it would have been covered by Flink's POJO serializer (through Beam) and that is breaking because of the serialVersionUID change

I am wondering how to work around this without losing my savepoint. We are going to try the following way and was wondering if the community had any suggestions
  • Add flink-avro into the job jar as mentioned in https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro. I am not sure this would work because the original bytes were written out by the POJO serializer and that is probably going to be used for deserialization? There must be some record of which serializer wrote out the bytes and I am not sure how to override that
  • I also wanted to make sure for future use cases that including the avro jar on the classpath will only affect Avro types by default
Thanks,
Yash
   

Reply | Threaded
Open this post in threaded view
|

Re: Flink and Avro for state serialization

Arvid Heise-4
Hi Yashwant,

I don't know Beam well, so you might also want to ask on their user list. But I'll try to answer it from Flink's perspective.

If you want to work with Avro, you should use an AvroSerializer which supports schema evolution in the best possible way.
PojoSerializer also allows small modifications but the error looks more like you actually used Serializable.

If you need further help, please provide the full stacktrace.

On Thu, Jun 10, 2021 at 10:49 PM Yashwant Ganti <[hidden email]> wrote:
Hello all,

We are running some Flink jobs - we wrote the job in Beam but are using the Flink Runner and are seeing the following error when restarting the job from a Savepoint

Caused by: java.io.InvalidClassException: com.xxx.xxx; local class incompatible: stream classdesc serialVersionUID = -5544933308624767500, local class serialVersionUID = -7822035950742261370

Here is what happened
  • The Type in question is an Avro Type - so we have a `PCollection<OurAvroType>` in the job. 
  • We updated the Avro schema and by default the generated class will have a new serialVersionUID in Avro (the serialVersionUIDs above line up with the ones in the generated Avro classes)
  • We did not use any custom serializers for this type so I believe it would have been covered by Flink's POJO serializer (through Beam) and that is breaking because of the serialVersionUID change

I am wondering how to work around this without losing my savepoint. We are going to try the following way and was wondering if the community had any suggestions
  • Add flink-avro into the job jar as mentioned in https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro. I am not sure this would work because the original bytes were written out by the POJO serializer and that is probably going to be used for deserialization? There must be some record of which serializer wrote out the bytes and I am not sure how to override that
  • I also wanted to make sure for future use cases that including the avro jar on the classpath will only affect Avro types by default
Thanks,
Yash