Re: Unable to deserialize Avro data using Pyflink
Posted by
Dian Fu on
May 21, 2021; 12:39pm
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Unable-to-deserialize-Avro-data-using-Pyflink-tp43752p43886.html
Hi Zerah,
Sorry for late response. I agree with your analysis. Currently, to be used in Python DataStream API, we have to provide a Java implementation which could produce Row instead of GenericRecord. As far as I know, currently there is still no built-in DeserializationSchema which could produce Row using with confluent schema registry. So I’m afraid that you have to implement a Java implementation yourself for now. I guess you could refer to DebeziumAvroDeserializationSchema [1] as example which produces RowData on top of ConfluentRegistryAvroDeserializationSchema.
Regards,
Dian
Hi Dian,
On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot perform any transformations on top of the data.
Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.
Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ?
Could you please suggest how to do this or any other solution to ingest Confluent Avro data from Kafka topic?
Hi Dian,
Thanks for your support.
I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by
ConfluentRegistryAvroDeserializationSchema
is not supported in PyFlink currently, I am unable to proceed.
I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}
Apart from this none of the transformations are not working.
Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.
Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ?
Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic
Regards,
Zerah
Thanks Dian. It worked for me
Regards,
Zerah
Hi Zerah,
You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>)
```
with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```
The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.
Regards,
Dian
Hi Dian,
Type of value_schema is <class 'avro.schema.RecordSchema'>
I have only a Json schema string and schema registry url. Please find below snippet :
import avro.schema
value_schema_str = """
{
"namespace": "com.nextgen.customer",
"type": "record",
"name": "employee",
"doc": "Customer Details Value Schema.",
"fields": [
{
"doc": "String Value",
"name": "emp_name",
"type": "string"
},
{
"doc": "String Value",
"name": "emp_id",
"type": "string"
}
]
}
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank" class="">http://host:port"
How can I create Java Schema object from this schema string and pass it from python method ?
Regards,
Zerah
Hi Zerah,
What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?
Regards,
Dian
Hi Dian,
Thanks for your suggestion.
I tried to invoke
ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :
class MyAvroRowDeserializationSchema(DeserializationSchema):
def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
FlinkKafkaConsumer is now invoked as below using
MyAvroRowDeserializationSchema
:
value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank" class="">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
topics='my_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})
I'm getting the below error :
Traceback (most recent call last):
File "flinkKafkaAvro.py", line 70, in datastream_api_demo
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
args_command, temp_args = self._build_args(*args)
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
[get_command_part(arg, self.pool) for arg in new_args])
File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
Please suggest how this method should be called. Here the schema used is avro schema.
Regards,
Zerah
Hi Zerah,
I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.
Regards,
Dian
[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah