Managing state migrations with Flink and Avro

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Managing state migrations with Flink and Avro

Petter Arvidsson
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter
Reply | Threaded
Open this post in threaded view
|

Re: Managing state migrations with Flink and Avro

Timo Walther
Hi Petter,

could you share the source code of the class that Avro generates out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter


Reply | Threaded
Open this post in threaded view
|

Re: Managing state migrations with Flink and Avro

Petter Arvidsson
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <[hidden email]> wrote:
Hi Petter,

could you share the source code of the class that Avro generates out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter




Accumulator.java (13K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Managing state migrations with Flink and Avro

Timo Walther
Thank you. Maybe we already identified the issue (see https://issues.apache.org/jira/browse/FLINK-9202). I will use your code to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <[hidden email]> wrote:
Hi Petter,

could you share the source code of the class that Avro generates out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter




Reply | Threaded
Open this post in threaded view
|

Re: Managing state migrations with Flink and Avro

Timo Walther
Hi Petter,

which state backend are you using in your case? I think there is no quick solution for your problem because a proper schema evolution story is on the roadmap for Flink 1.6.

Would it work to change the serial version id of the generated Avro class as a temporary workaround?

Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
Thank you. Maybe we already identified the issue (see https://issues.apache.org/jira/browse/FLINK-9202). I will use your code to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <[hidden email]> wrote:
Hi Petter,

could you share the source code of the class that Avro generates out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter





Reply | Threaded
Open this post in threaded view
|

Re: Managing state migrations with Flink and Avro

Petter Arvidsson
Hi Timo,

Thanks for your response. We are using the filesystem backend backed by S3.

We were looking for a good long term solution with Avro, so manually changing the serial version id is probably not the right way to proceed for us. I think we will wait for Flink1.6 before trying to properly implement state migrations in this case.

Regards,
Petter

On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <[hidden email]> wrote:
Hi Petter,

which state backend are you using in your case? I think there is no quick solution for your problem because a proper schema evolution story is on the roadmap for Flink 1.6.

Would it work to change the serial version id of the generated Avro class as a temporary workaround?

Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
Thank you. Maybe we already identified the issue (see https://issues.apache.org/jira/browse/FLINK-9202). I will use your code to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
Hi Timo,

Please find the generated class (for the second schema) attached.

Regards,
Petter

On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <[hidden email]> wrote:
Hi Petter,

could you share the source code of the class that Avro generates out of this schema?

Thank you.

Regards,
Timo

Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
Hello everyone,

I am trying to figure out how to set up Flink with Avro for state management (especially the content of snapshots) to enable state migrations (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I tried to explicitly provide an instance of "new AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load the snapshot with an updated schema (adding the nullable field) it fails. Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator; local class incompatible: stream classdesc serialVersionUID = -3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean, I just didn't expect it to be used and expected the Avro schema to be used instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter