Hi, I'm trying to write a Flink job (with the Python streaming API) that handles a custom type that needs a custom Kryo serializer. When we implemented a similar job in Scala we used addDefaultKryoSerializer, similar to the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/custom_serializers.html In Python, the PythonStreamExecutionEnvironment doesn't have this method, but it does have an ordinary StreamExecutionEnvironment as a private field ("env"). I'm using reflection to get this StreamExecutionEnvironment and calling addDefaultKryoSerializer on it, but that doesn't seem to work: f = python_execution_env.getClass().getDeclaredField("env") f.setAccessible(True) java_execution_env = f.get(python_execution_env) java_execution_env.addDefaultKryoSerializer(Message, MessageKryoSerializer) With or without these lines, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything. Is there an officially supported way to set custom serializers in Python? Thanks, Joe Malt Engineering Intern, Stream Processing Yelp |
Hi Joe,
Probably Chesnay (cc’ed) may have a better idea on why this is happening. Cheers, Kostas
|
Bumping this (I hope that's OK!) - I've been trying to fix this for a week and got nowhere On Mon, Sep 17, 2018 at 8:40 AM, Kostas Kloudas <[hidden email]> wrote:
|
I can't really help you here.
Digging into the backing java internals isn't supported, and neither is registering a kryo serializer (which is why it isn't exposed in the python environment). The jython-related serialization logic doesn't care about Flink's usual type serialization mechanism, so using avro will simply not work. It entirely assumes that all data is either created on the python or can be mapped automatically to a python type by jython. On 18.09.2018 20:05, Joe Malt wrote:
|
Free forum by Nabble | Edit this page |