Using a custom DeserializationSchema with Kafka and Python

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Using a custom DeserializationSchema with Kafka and Python

Joe Malt
Hi,

I'm trying to write a pipeline using the new Python streaming API, which reads from Kafka using FlinkKafkaConsumer010.

This works fine when using an existing deserializer like the SimpleStringSchema but I need to define my own deserializer to process a custom format. I've written a class which extends SimpleStringSchema, but I get an ImportError when trying to use it.

The class is as follows:
from org.apache.flink.api.common.serialization import SimpleStringSchema

class MyCustomKafkaDeserializer(SimpleStringSchema):

def __init__(self):
SimpleStringSchema.__init__(self)
print "created MyKafkaDeserializer"

def deserialize(self, *args):
# snip
I instantiate the Kafka consumer like this:

consumer = FlinkKafkaConsumer010([configs['kafkaTopic']], MyCustomKafkaDeserializer(), props)

When I start the pipeline I see the message printed in the constructor (so the deserializer is being created) but once env.execute() is called I get this error:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: ImportError: No module named MyCustomKafkaDeserializer

	at org.python.core.Py.ImportError(Py.java:328)

The issue is the same whether MyCustomKafkaDeserializer is defined in the same file as the pipeline, or in another file and imported. It seems that the internals of Flink can't find the class for some reason.

The command I'm using to run the pipeline: 
./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local

How can I make Flink see the custom deserializer?

Thanks,

Joe Malt

Software Engineering Intern, Stream Processing
Yelp
Reply | Threaded
Open this post in threaded view
|

Re: Using a custom DeserializationSchema with Kafka and Python

Chesnay Schepler
This doesn't work since the FlinkKafkaConsumer010 isn't aware that the given deserializer is a jython class.
Jython classes have to be serialized in a specific way (as seen in AbstractPythonUDF).

For this to work you'll need to create a (java!) wrapper around the FlinkKafkaConsumer010 that serializes the schema appropriately.
This applies to every connector that accepts user-defined classes.

Note that we haven't really looked at how the Streaming Python API interacts with existing connectors.

On 07.08.2018 02:05, Joe Malt wrote:
Hi,

I'm trying to write a pipeline using the new Python streaming API, which reads from Kafka using FlinkKafkaConsumer010.

This works fine when using an existing deserializer like the SimpleStringSchema but I need to define my own deserializer to process a custom format. I've written a class which extends SimpleStringSchema, but I get an ImportError when trying to use it.

The class is as follows:
from org.apache.flink.api.common.serialization import SimpleStringSchema

class MyCustomKafkaDeserializer(SimpleStringSchema):

    def __init__(self):
        SimpleStringSchema.__init__(self)
        print "created MyKafkaDeserializer"

    def deserialize(self, *args):
        # snip
I instantiate the Kafka consumer like this:

consumer = FlinkKafkaConsumer010([configs['kafkaTopic']], MyCustomKafkaDeserializer(), props)

When I start the pipeline I see the message printed in the constructor (so the deserializer is being created) but once env.execute() is called I get this error:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: ImportError: No module named MyCustomKafkaDeserializer

	at org.python.core.Py.ImportError(Py.java:328)

          
The issue is the same whether MyCustomKafkaDeserializer is defined in the same file as the pipeline, or in another file and imported. It seems that the internals of Flink can't find the class for some reason.

The command I'm using to run the pipeline: 
./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local
How can I make Flink see the custom deserializer?
Thanks,
Joe Malt
Software Engineering Intern, Stream Processing
Yelp