In-Memory state serialization with kryo fails

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

In-Memory state serialization with kryo fails

Rinat
Hi mates !

I’ve implemented a job, that stores it’s progress using MapState[K, V], where K - is java.lang.String, and V - is a collection of some typed objects java.util.List[SomeClass[_]]
When Flink is trying to serialize this state, it is using kryo serializer for value object and fails with StackOverflowException

java.lang.StackOverflowError
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
 
This problem is related with the known bug in kryo (https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself only when type of SomeClass is a java.util.BitSet. 

I’ve checked my job locally (from IDE) with latest (4.0.2) kryo lib, and it works fine, but I couldn’t change kryo version for distributed mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
contains all runtime dependencies for Flink.

Maybe you can give me any advices, how to solve this issue, or register a separate serializers for this case ?

Thx for your help.


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: In-Memory state serialization with kryo fails

Tzu-Li (Gordon) Tai
Hi,

I would suggest to avoid Kryo for state serialization, especially if this job is meant for production usage.
It might get in the way in the future when you might decide to upgrade your value state schema.

To do that, when declaring the descriptor for your MapState, provide a specific serializer for your value ( java.util.List[SomeClass[_]]  ).
You should be able to use Flink's ListSerializer for this. By providing a specific serializer, this bypasses Flink's type extraction for your state which determines to use the KryoSerializer as a fallback for unrecognizable types.
You can find more information about custom state serialization here [1].

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html

On Wed, Feb 13, 2019 at 2:56 AM Rinat <[hidden email]> wrote:
Hi mates !

I’ve implemented a job, that stores it’s progress using MapState[K, V], where K - is java.lang.String, and V - is a collection of some typed objects java.util.List[SomeClass[_]]
When Flink is trying to serialize this state, it is using kryo serializer for value object and fails with StackOverflowException

java.lang.StackOverflowError
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
 
This problem is related with the known bug in kryo (https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself only when type of SomeClass is a java.util.BitSet. 

I’ve checked my job locally (from IDE) with latest (4.0.2) kryo lib, and it works fine, but I couldn’t change kryo version for distributed mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
contains all runtime dependencies for Flink.

Maybe you can give me any advices, how to solve this issue, or register a separate serializers for this case ?

Thx for your help.


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever

Reply | Threaded
Open this post in threaded view
|

Re: In-Memory state serialization with kryo fails

Rinat
Hi Gordon, thx for you time, will try to find other suitable serializer.

On 13 Feb 2019, at 07:25, Tzu-Li (Gordon) Tai <[hidden email]> wrote:

Hi,

I would suggest to avoid Kryo for state serialization, especially if this job is meant for production usage.
It might get in the way in the future when you might decide to upgrade your value state schema.

To do that, when declaring the descriptor for your MapState, provide a specific serializer for your value ( java.util.List[SomeClass[_]]  ).
You should be able to use Flink's ListSerializer for this. By providing a specific serializer, this bypasses Flink's type extraction for your state which determines to use the KryoSerializer as a fallback for unrecognizable types.
You can find more information about custom state serialization here [1].

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html

On Wed, Feb 13, 2019 at 2:56 AM Rinat <[hidden email]> wrote:
Hi mates !

I’ve implemented a job, that stores it’s progress using MapState[K, V], where K - is java.lang.String, and V - is a collection of some typed objects java.util.List[SomeClass[_]]
When Flink is trying to serialize this state, it is using kryo serializer for value object and fails with StackOverflowException

java.lang.StackOverflowError
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
 
This problem is related with the known bug in kryo (https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself only when type of SomeClass is a java.util.BitSet. 

I’ve checked my job locally (from IDE) with latest (4.0.2) kryo lib, and it works fine, but I couldn’t change kryo version for distributed mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
contains all runtime dependencies for Flink.

Maybe you can give me any advices, how to solve this issue, or register a separate serializers for this case ?

Thx for your help.


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

mobile: +7 (925) 416-37-26

CleverDATA
make your data clever