Hi all,
I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below: class ByteSerializer(SerializationSchema, DeserializationSchema): def __init__(self, execution_environment): gate_way = get_gateway() j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( Types.BYTE().get_java_type_info(), get_j_env_configuration(execution_environment), ) SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema) DeserializationSchema.__init__( self, j_deserialization_schema=j_byte_string_schema ) The ByteSerializer is used like this: return FlinkKafkaConsumer( ["client_request", "internal"], ByteSerializer(self.env._j_stream_execution_environment), { "bootstrap.servers": "localhost:9092", "auto.offset.reset": "latest", "group.id": str(uuid.uuid4()), }, ) However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace,but I think it boils down to this stacktrace part:answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n' |
Hi Wouter,
As the exception indicate, the constructor doesn’t exists. Could you try with the following: ``` j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info() j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig()) j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer) ``` Regards, Dian
|
Hi Dian, all, Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception: Caused by: java.lang.NegativeArraySizeException: -2147183315 at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31) at org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) To be more precise, the messages in my Kafka topic are pickled Python objects. Maybe that is the reason for the exception, I also tried using Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same serializer because I get the same exception. Any suggestions? Thanks for your help! Regards, Wouter On Fri, 4 Jun 2021 at 08:24, Dian Fu <[hidden email]> wrote:
|
Hi Dian, all, The way I resolved right now, is to write my own custom serializer which only maps from bytes to bytes. See the code below: public class KafkaBytesSerializer implements SerializationSchema<byte[]>, DeserializationSchema<byte[]> { @Override public byte[] deserialize(byte[] bytes) throws IOException { return bytes; } @Override public boolean isEndOfStream(byte[] bytes) { return false; } @Override public byte[] serialize(byte[] bytes) { return bytes; } @Override public TypeInformation<byte[]> getProducedType() { return TypeInformation.of(byte[].class); } } This code is packaged in a jar and uploaded through env.add_jars. That works like a charm! Thanks for the help! Wouter On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager <[hidden email]> wrote:
|
Hi Wouter,
Great to hear and thanks for the sharing! Regards, Dian
|
Free forum by Nabble | Edit this page |