Hi, 1. Read streaming data from Kafka topic using Flink Python API 2. Apply transformations on the data stream 3. Write back to different kafka topics based on the incoming data Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data. Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API. Regards, Zerah |
Hi Zerah,
I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example. Regards, Dian [1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 > 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道: > > Hi, > > I have below use case > > 1. Read streaming data from Kafka topic using Flink Python API > 2. Apply transformations on the data stream > 3. Write back to different kafka topics based on the incoming data > > Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data. > > Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API. > > Regards, > Zerah |
Hi Dian, Thanks for your suggestion. I tried to invoke
ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below : class MyAvroRowDeserializationSchema(DeserializationSchema): def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None): JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema) FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema : value_schema = avro.schema.parse(<reader schema goes here>) schema_url = "http://host:port" deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) kafka_source = FlinkKafkaConsumer( topics='my_topic', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'}) I'm getting the below error : Traceback (most recent call last): File "flinkKafkaAvro.py", line 70, in datastream_api_demo deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__ args_command, temp_args = self._build_args(*args) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args [get_command_part(arg, self.pool) for arg in new_args]) File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp> [get_command_part(arg, self.pool) for arg in new_args]) File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part command_part = REFERENCE_TYPE + parameter._get_object_id() AttributeError: 'RecordSchema' object has no attribute '_get_object_id' Please suggest how this method should be called. Here the schema used is avro schema. Regards, Zerah On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote: Hi Zerah, |
Hi Zerah,
What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true? Regards, Dian
|
Hi Dian, Type of value_schema is <class 'avro.schema.RecordSchema'> I have only a Json schema string and schema registry url. Please find below snippet : import avro.schema value_schema_str = """ { "namespace": "com.nextgen.customer", "type": "record", "name": "employee", "doc": "Customer Details Value Schema.", "fields": [ { "doc": "String Value", "name": "emp_name", "type": "string" }, { "doc": "String Value", "name": "emp_id", "type": "string" } ] } value_schema = avro.schema.parse(value_schema_str) schema_url = "http://host:port" How can I create Java Schema object from this schema string and pass it from python method ? Regards, Zerah On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
|
Hi Zerah,
You could try to replace ``` value_schema = avro.schema.parse(<reader schema goes here>) ``` with the following code: ``` JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser value_schema = JSchemaParser().parse(value_schema_str) ``` The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object. Regards, Dian
|
Thanks Dian. It worked for me Regards, Zerah On Wed, May 19, 2021, 5:14 PM Dian Fu <[hidden email]> wrote:
|
Hi Dian, Thanks for your support. I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by
ConfluentRegistryAvroDeserializationSchema
is not supported in PyFlink currently, I am unable to proceed. I can print the datastream using ds.print. Below is the result 3> {"name": "abc", "id": "123"} 3> {"name": "cde", "id": "456"} Apart from this none of the transformations are not working. Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently. Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic Regards, Zerah On Wed, May 19, 2021 at 7:13 PM Zerah J <[hidden email]> wrote:
|
Hi Dian, On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot perform any transformations on top of the data. Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently. Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? Could you please suggest how to do this or any other solution to ingest Confluent Avro data from Kafka topic? Regards, Zerah On Thu, May 20, 2021 at 7:07 PM Zerah J <[hidden email]> wrote:
|
Hi Zerah,
Sorry for late response. I agree with your analysis. Currently, to be used in Python DataStream API, we have to provide a Java implementation which could produce Row instead of GenericRecord. As far as I know, currently there is still no built-in DeserializationSchema which could produce Row using with confluent schema registry. So I’m afraid that you have to implement a Java implementation yourself for now. I guess you could refer to DebeziumAvroDeserializationSchema [1] as example which produces RowData on top of ConfluentRegistryAvroDeserializationSchema. Regards, Dian
|
Free forum by Nabble | Edit this page |