Hi,
I'm trying to write to a Kafka stream in a Flink job using the new Python streaming API. My program looks like this: def main(factory):I'm getting a ClassCastException when trying to output to the FlinkKafkaProducer: java.lang.ClassCastException: org.python.core.PyUnicode cannot be cast to java.lang.String at org.apache.flink.api.common.serialization.SimpleStringSchema.serialize(SimpleStringSchema.java:36) at org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue(KeyedSerializationSchemaWrapper.java:46) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:355) at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:48) at org.apache.flink.streaming.python.api.functions.PythonSinkFunction.invoke(PythonSinkFunction.java:37) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) It seems that the Python string isn't getting converted to a java.lang.String, which should happen automatically in Jython. I've tried adding a MapFunction that maps each input to String(input)where String is the constructor for java.lang.String. This made no difference; I get the same error. Any ideas? Thanks, Joe Malt Software Engineering Intern Yelp |
Hi Joe, Thanks, vino. Joe Malt <[hidden email]> 于2018年8月15日周三 上午7:16写道:
|
As seen in the stacktrace every sink
added via StreamExEnv#add_source is wrapped in a
PythonSinkFunction which internally converts things to PyObjects,
that's why the mapper had no effect.
Currently we don't differentiate between java/python sinks, contrary to sources where we have an explicit StreamExEnv#add_java_source method. There are 2 ways to approach this issue: * As alluded in a previous mail, create a python wrapper around the kafka consumer class. * extend PythonDataStream class with a separate method for kafka. Unfortunately I don't think we can solve this in a generic matter (i.e. add_java_source) since the java types wouldn't fit at compile time. On 15.08.2018 04:15, vino yang wrote:
|
Free forum by Nabble | Edit this page |