Hi
I’ve been trying to get state migration with Avro working on Flink 1.7.2 using Scala case classes but I’m not getting anywhere closer to solving it. We’re using the most basic streaming WordCount example as a reference to test the schema evolution: val wordCountStream: DataStream[WordWithCount] = text In this example, WordWithCount is our data object that we’d like to have serialized and deserialized with schema evolution support since keyBy maintains state. I understood from the documentation that it would only work for classes generated from Avro schema’s so I’ve tried using sbt-avrohugger to generate our case classes. However, for normal case classes generated by Avro we quickly ran into the problem that we needed a no-arg constructor. We looked at the flink-avro module and noticed that the classes generated by the avro-maven-plugin were implementing SpecificRecord and seemed to comply with the POJO rules as described in the Flink documentation. After switching from normal to specific avro generation with sbt-avrohugger, we ended up with Scala case classes that should comply with all rules. An example of such a generated case class is as follows: /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ This, however, also didn’t work out of the box. We then tried to define our own type information using flink-avro’s AvroTypeInfo but this fails because Avro looks for a SCHEMA$ property (SpecificData:285) in the class and is unable to use Java reflection to identify the SCHEMA$ in the Scala companion object. implicit val wordWithCountInfo: AvroTypeInfo[WordWithCount] = new AvroTypeInfo(classOf[WordWithCount])We then read in the 1.7 documentation that Flink doesn’t natively support POJO types, but only state defined by descriptors, like f.e. the ListStateDescriptor, and only if you allow Flink to infer the type information. This is definitely what we need for our processors that have map and list state. However, for the simple word count example, we should only need native POJO (de)serialization with state migration. We then noticed Github PR #7759 that adds support for POJO state schema evolution/migration. We wanted to give this a try and built flink from source from the release-1.8 branch. We then included the 1.8-SNAPSHOT jars in our job and got a local 1.8 cluster and job running fine. However, if we do not specify our own type information, and perform the following steps:
We are then faced with the following error: Caused by: java.lang.IllegalArgumentException: array is not of length 3 thrown from ScalaCaseClassSerializer.scala:50 However, if we again try to define our own type information using the AvroTypeInfo class, we are faced with the same issue as in 1.7. What are we missing? The documentation on how to use this is very limited, and we’re getting the idea that it may work with Java types, but maybe not with Scala case classes. I’d love to hear some pointers on how to approach this? Compared to our solution in 1.4 (https://medium.com/wbaa/evolve-your-data-model-in-flinks-state-using-avro-f26982afa399), we hoped to get rid of all the custom serializers by moving to 1.7 Thanks in advance! Marc |
Hi Marc! I know we talked offline about the issues mentioned in this topic already, but I'm just relaying the result of the discussions here to make it searchable by others bumping into the same issues. On Thu, Mar 21, 2019 at 4:27 PM Marc Rooding <[hidden email]> wrote:
This is now tracked by https://issues.apache.org/jira/browse/FLINK-12501. This is really a problem with Avro's SpecificData#getSchema() method not working well with a specific 3rd library implementation like avrohugger. Either this is fixed in avrohugger, or we work-around this by explicitly handling the case.
I think there is a misunderstanding here. Scala case classes are not considered as POJOs by Flink. Since 1.8, Flink does support schema evolution for POJOs, but not for Scala case classes yet. That would explain the mismatching array length error message you got, since you added a new field to the case class.
So, to conclude this: - With FLINK-12501 fixed, Avro + Scala case classes generated using Avrohugger should work. Would be great if you can send me a minimal project to reproduce your error. - Schema evolution for case classes is not supported, so using case classes directly at the moment would not work. This would be a much bigger effort compared to the above, so I can't be certain when this will be supported. - Without any of the above, custom serializers is still the way to go right now if you want to have case classes + schema evolution. WIth the new serializer snapshot abstractions in 1.8, it would still be easy to remove them completely and migrate to Flink-shipped serializers eventually, once they support this scenario. - The documentation should probably clarify things a bit more regarding support in Java v.s. Scala, and case classes / POJO. Cheers, Gordon |
Free forum by Nabble | Edit this page |