Hello everyone, I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:{"namespace": "io.relayr.flink", "type": "record", "name": "Accumulator", "fields": [ {"name": "accumulator", "type": "int"} ] } This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this: {"namespace": "io.relayr.flink", "type": "record", "name": "Accumulator", "fields": [ {"name": "accumulator", "type": "int"}, {"name": "newStuff", "type": ["int", "null"]} ] } When I try to restart the Job from the snapshot, I get the following exception: 2018-04-17 09:35:23,519 WARN org.apache.flink.api.common. java.io.IOException: Unloadable class for type serializer. ... Caused
by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local
class incompatible: stream classdesc serialVersionUID =
-3555733236161157838, local class serialVersionUID = 5291033088112484292 Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong? Best regards, Petter |
Hi Petter,
could you share the source code of the class that Avro generates out of this schema? Thank you. Regards, Timo Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
|
Hi Timo, Please find the generated class (for the second schema) attached.On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <[hidden email]> wrote:
Accumulator.java (13K) Download Attachment |
Thank you. Maybe we already identified
the issue (see https://issues.apache.org/jira/browse/FLINK-9202).
I will use your code to verify it.
Regards, Timo Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
|
Hi Petter,
which state backend are you using in your case? I think there is no quick solution for your problem because a proper schema evolution story is on the roadmap for Flink 1.6. Would it work to change the serial version id of the generated Avro class as a temporary workaround? Regards, Timo Am 18.04.18 um 14:21 schrieb Timo Walther:
|
Hi Timo, Thanks for your response. We are using the filesystem backend backed by S3.On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |