How important is 'registerType'?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How important is 'registerType'?

Dmitry Golubets
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: How important is 'registerType'?

Aljoscha Krettek
Hi,
are you changing anything on your job between performing the savepoint and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version, changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <[hidden email]> wrote:
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry
Reply | Threaded
Open this post in threaded view
|

Re: How important is 'registerType'?

Dmitry Golubets
Hi,

I was using ```cs.knownDirectSubclasses``` recursively to find and register subclasses, which may have resulted in order mess.
Later I changed that to ````cs.knownDirectSubclasses.toList.sortBy(_.fullName)``` which should have fixed the order.
But either it didn't or there was another problem, I was getting the error anyway.
Interesting, it happend only on a KeyedStream after window, without window it was fine.
I didn't change anything else in the job.

However I removed ```registerType``` calls completely. Because I didn't notice any performance difference.
Do you know if ```registerType``` has any effect at all if I use it together with ```addDefaultKryoSerializer``` for that type?


Best regards,
Dmitry

On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
are you changing anything on your job between performing the savepoint and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version, changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <[hidden email]> wrote:
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry

Reply | Threaded
Open this post in threaded view
|

Re: How important is 'registerType'?

Till Rohrmann

Hi Dmitry,

curious to know when exactly you observed the IllegalStateException. Did it happen after resuming from a savepoint or did it already happen during the first run of the program? If the latter is the case, then this might indicate a bug where we don’t use the correct ExecutionConfig to instantiate the serializers.

Concerning the addDefaultKryoSerializer method, this basically register a serializer for a specific type but it does not register the type with Kryo. Thus, it should still be beneficial to call registerType for the type for which you’ve registered a default serializer. But you can also call registerTypeWithKryoSerializer which does both for you.

Cheers,
Till


On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets <[hidden email]> wrote:
Hi,

I was using ```cs.knownDirectSubclasses``` recursively to find and register subclasses, which may have resulted in order mess.
Later I changed that to ````cs.knownDirectSubclasses.toList.sortBy(_.fullName)``` which should have fixed the order.
But either it didn't or there was another problem, I was getting the error anyway.
Interesting, it happend only on a KeyedStream after window, without window it was fine.
I didn't change anything else in the job.

However I removed ```registerType``` calls completely. Because I didn't notice any performance difference.
Do you know if ```registerType``` has any effect at all if I use it together with ```addDefaultKryoSerializer``` for that type?


Best regards,
Dmitry

On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
are you changing anything on your job between performing the savepoint and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version, changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <[hidden email]> wrote:
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry


Reply | Threaded
Open this post in threaded view
|

Re: How important is 'registerType'?

Dmitry Golubets
Hi Till,

It happened during deserialization of a savepoint.

Best regards,
Dmitry

On Fri, Feb 17, 2017 at 2:48 PM, Till Rohrmann <[hidden email]> wrote:

Hi Dmitry,

curious to know when exactly you observed the IllegalStateException. Did it happen after resuming from a savepoint or did it already happen during the first run of the program? If the latter is the case, then this might indicate a bug where we don’t use the correct ExecutionConfig to instantiate the serializers.

Concerning the addDefaultKryoSerializer method, this basically register a serializer for a specific type but it does not register the type with Kryo. Thus, it should still be beneficial to call registerType for the type for which you’ve registered a default serializer. But you can also call registerTypeWithKryoSerializer which does both for you.

Cheers,
Till


On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets <[hidden email]> wrote:
Hi,

I was using ```cs.knownDirectSubclasses``` recursively to find and register subclasses, which may have resulted in order mess.
Later I changed that to ````cs.knownDirectSubclasses.toList.sortBy(_.fullName)``` which should have fixed the order.
But either it didn't or there was another problem, I was getting the error anyway.
Interesting, it happend only on a KeyedStream after window, without window it was fine.
I didn't change anything else in the job.

However I removed ```registerType``` calls completely. Because I didn't notice any performance difference.
Do you know if ```registerType``` has any effect at all if I use it together with ```addDefaultKryoSerializer``` for that type?


Best regards,
Dmitry

On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
are you changing anything on your job between performing the savepoint and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version, changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <[hidden email]> wrote:
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry



Reply | Threaded
Open this post in threaded view
|

Re: How important is 'registerType'?

Aljoscha Krettek
Hi Dmitry,
do you maybe have a more complete stack trace? I have a suspicion but I would like to confirm that.

Best,
Aljoscha

On Fri, 17 Feb 2017 at 20:24 Dmitry Golubets <[hidden email]> wrote:
Hi Till,

It happened during deserialization of a savepoint.

Best regards,
Dmitry

On Fri, Feb 17, 2017 at 2:48 PM, Till Rohrmann <[hidden email]> wrote:

Hi Dmitry,

curious to know when exactly you observed the IllegalStateException. Did it happen after resuming from a savepoint or did it already happen during the first run of the program? If the latter is the case, then this might indicate a bug where we don’t use the correct ExecutionConfig to instantiate the serializers.

Concerning the addDefaultKryoSerializer method, this basically register a serializer for a specific type but it does not register the type with Kryo. Thus, it should still be beneficial to call registerType for the type for which you’ve registered a default serializer. But you can also call registerTypeWithKryoSerializer which does both for you.

Cheers,
Till


On Fri, Feb 17, 2017 at 12:38 PM, Dmitry Golubets <[hidden email]> wrote:
Hi,

I was using ```cs.knownDirectSubclasses``` recursively to find and register subclasses, which may have resulted in order mess.
Later I changed that to ````cs.knownDirectSubclasses.toList.sortBy(_.fullName)``` which should have fixed the order.
But either it didn't or there was another problem, I was getting the error anyway.
Interesting, it happend only on a KeyedStream after window, without window it was fine.
I didn't change anything else in the job.

However I removed ```registerType``` calls completely. Because I didn't notice any performance difference.
Do you know if ```registerType``` has any effect at all if I use it together with ```addDefaultKryoSerializer``` for that type?


Best regards,
Dmitry

On Thu, Feb 16, 2017 at 10:40 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
are you changing anything on your job between performing the savepoint and restoring the savepoint? Flink upgrade, Job upgrade, changing Kryo version, changing order in which you register Kryo serialisers?

Best,
Aljoscha

On Fri, 10 Feb 2017 at 18:26 Dmitry Golubets <[hidden email]> wrote:
The docs say that it may improve performance.

How true is it, when custom serializers are provided?
There is also 'disableAutoTypeRegistration' method in the config class, implying Flink registers types automatically.

So, given that I have an hierarchy:
trait A
class B extends A
class C extends A

and I do addDefaultKryoSerializer(classOf[A], classOf[ASerializer])

should I care about registering B and C with 'registerType' method?

It worth mentioning that when I registered my message class hierarchies, I got:
java.lang.IllegalStateException: Could not initialize keyed state backend.
java.io.StreamCorruptedException: invalid type code: 00
on restoring from savepoint

After some debugging I found that 'registerType' was the cause.
It might be possible that my code called registerType in different order.
Could it be a problem?

Best regards,
Dmitry