Setting a custom Kryo serializer in Flink-Python

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Setting a custom Kryo serializer in Flink-Python

Joe Malt
Hi,

I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html

In Python, the PythonStreamExecutionEnvironment doesn't have this method, but it does have an ordinary StreamExecutionEnvironment as a private field ("env"). I'm using reflection to get this StreamExecutionEnvironment and calling addDefaultKryoSerializer on it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in Python? 

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp


Reply | Threaded
Open this post in threaded view
|

Re: Setting a custom Kryo serializer in Flink-Python

Kostas Kloudas
Hi Joe,

Probably Chesnay (cc’ed) may have a better idea on why this is happening.

Cheers,
Kostas

On Sep 14, 2018, at 7:30 PM, Joe Malt <[hidden email]> wrote:

Hi,

I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html

In Python, the PythonStreamExecutionEnvironment doesn't have this method, but it does have an ordinary StreamExecutionEnvironment as a private field ("env"). I'm using reflection to get this StreamExecutionEnvironment and calling addDefaultKryoSerializer on it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in Python? 

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp



Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: Setting a custom Kryo serializer in Flink-Python

Joe Malt
Bumping this (I hope that's OK!) - I've been trying to fix this for a week and got nowhere

On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Joe,

Probably Chesnay (cc’ed) may have a better idea on why this is happening.

Cheers,
Kostas

On Sep 14, 2018, at 7:30 PM, Joe Malt <[hidden email]> wrote:

Hi,

I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html

In Python, the PythonStreamExecutionEnvironment doesn't have this method, but it does have an ordinary StreamExecutionEnvironment as a private field ("env"). I'm using reflection to get this StreamExecutionEnvironment and calling addDefaultKryoSerializer on it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in Python? 

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp




Reply | Threaded
Open this post in threaded view
|

Re: [External] Re: Setting a custom Kryo serializer in Flink-Python

Chesnay Schepler
I can't really help you here.

Digging into the backing java internals isn't supported, and neither is registering a kryo serializer (which is why it isn't exposed in the python environment).
The jython-related serialization logic doesn't care about Flink's usual type serialization mechanism, so using avro will simply not work. It entirely assumes that all data is either created on the python or can be mapped automatically to a python type by jython.

On 18.09.2018 20:05, Joe Malt wrote:
Bumping this (I hope that's OK!) - I've been trying to fix this for a week and got nowhere

On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Joe,

Probably Chesnay (cc’ed) may have a better idea on why this is happening.

Cheers,
Kostas

On Sep 14, 2018, at 7:30 PM, Joe Malt <[hidden email]> wrote:

Hi,

I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer.

When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html

In Python, the PythonStreamExecutionEnvironment doesn't have this method, but it does have an ordinary StreamExecutionEnvironment as a private field ("env"). I'm using reflection to get this StreamExecutionEnvironment and calling addDefaultKryoSerializer on it, but that doesn't seem to work:

f = python_execution_env.getClass().getDeclaredField("env")
f.setAccessible(True)
java_execution_env = f.get(python_execution_env)
java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer)

With or without these lines, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything.

Is there an officially supported way to set custom serializers in Python? 

Thanks,

Joe Malt
Engineering Intern, Stream Processing
Yelp