Hi,
I'm trying to write a pipeline using the new Python streaming API, which reads from Kafka using FlinkKafkaConsumer010. This works fine when using an existing deserializer like the SimpleStringSchema but I need to define my own deserializer to process a custom format. I've written a class which extends SimpleStringSchema, but I get an ImportError when trying to use it. The class is as follows: from org.apache.flink.api.common.serialization import SimpleStringSchema I instantiate the Kafka consumer like this:consumer = FlinkKafkaConsumer010([configs['kafkaTopic']], MyCustomKafkaDeserializer(), props) When I start the pipeline I see the message printed in the constructor (so the deserializer is being created) but once env.execute() is called I get this error: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: ImportError: No module named MyCustomKafkaDeserializer at org.python.core.Py.ImportError(Py.java:328) The issue is the same whether MyCustomKafkaDeserializer is defined in the same file as the pipeline, or in another file and imported. It seems that the internals of Flink can't find the class for some reason. The command I'm using to run the pipeline:./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local How can I make Flink see the custom deserializer? Thanks, Joe Malt Software Engineering Intern, Stream Processing Yelp |
This doesn't work since the
FlinkKafkaConsumer010 isn't aware that the given deserializer is a
jython class.
Jython classes have to be serialized in a specific way (as seen in AbstractPythonUDF). For this to work you'll need to create a (java!) wrapper around the FlinkKafkaConsumer010 that serializes the schema appropriately. This applies to every connector that accepts user-defined classes. Note that we haven't really looked at how the Streaming Python API interacts with existing connectors. On 07.08.2018 02:05, Joe Malt wrote:
|
Free forum by Nabble | Edit this page |