Unable to deserialize Avro data using Pyflink

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to deserialize Avro data using Pyflink

Zerah J
Hi,

I have below use case

1. Read streaming data from Kafka topic using Flink Python API
2. Apply transformations on the data stream
3. Write back to different kafka topics based on the incoming data

Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.

Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.

Regards,
Zerah
Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Dian Fu
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah

Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Zerah J
Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah

Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Dian Fu
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" class="">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah


Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Zerah J
Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" target="_blank">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah


Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Dian Fu
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.

Regards,
Dian

2021年5月19日 下午5:23,Zerah J <[hidden email]> 写道:

Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" class="">http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" target="_blank" class="">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah



Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Zerah J
Thanks Dian. It worked for me

Regards,
Zerah

On Wed, May 19, 2021, 5:14 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.

Regards,
Dian

2021年5月19日 下午5:23,Zerah J <[hidden email]> 写道:

Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" target="_blank" rel="noreferrer">http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" target="_blank" rel="noreferrer">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah



Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Zerah J
Hi Dian,

Thanks for your support.

I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed.

I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}



Apart from this none of the transformations are not working.

Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.


Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? 
Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic

Regards,
Zerah

On Wed, May 19, 2021 at 7:13 PM Zerah J <[hidden email]> wrote:
Thanks Dian. It worked for me

Regards,
Zerah

On Wed, May 19, 2021, 5:14 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.

Regards,
Dian

2021年5月19日 下午5:23,Zerah J <[hidden email]> 写道:

Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank">http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah



Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Zerah J
Hi Dian,

On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot perform any transformations on top of the data. 

Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.

Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ?  

Could you please suggest how to do this or any other solution to ingest Confluent Avro data from Kafka topic?



Regards,
Zerah


On Thu, May 20, 2021 at 7:07 PM Zerah J <[hidden email]> wrote:
Hi Dian,

Thanks for your support.

I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed.

I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}



Apart from this none of the transformations are not working.

Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.


Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? 
Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic

Regards,
Zerah

On Wed, May 19, 2021 at 7:13 PM Zerah J <[hidden email]> wrote:
Thanks Dian. It worked for me

Regards,
Zerah

On Wed, May 19, 2021, 5:14 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.

Regards,
Dian

2021年5月19日 下午5:23,Zerah J <[hidden email]> 写道:

Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank">http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah



Reply | Threaded
Open this post in threaded view
|

Re: Unable to deserialize Avro data using Pyflink

Dian Fu
Hi Zerah,

Sorry for late response. I agree with your analysis. Currently, to be used in Python DataStream API, we have to provide a Java implementation which could produce Row instead of GenericRecord. As far as I know, currently there is still no built-in DeserializationSchema which could produce Row using with confluent schema registry. So I’m afraid that you have to implement a Java implementation yourself for now. I guess you could refer to DebeziumAvroDeserializationSchema [1] as example which produces RowData on top of ConfluentRegistryAvroDeserializationSchema. 

Regards,
Dian


2021年5月21日 下午8:11,Zerah J <[hidden email]> 写道:

Hi Dian,

On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot perform any transformations on top of the data. 

Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.

Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ?  

Could you please suggest how to do this or any other solution to ingest Confluent Avro data from Kafka topic?



Regards,
Zerah


On Thu, May 20, 2021 at 7:07 PM Zerah J <[hidden email]> wrote:
Hi Dian,

Thanks for your support.

I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed.

I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}



Apart from this none of the transformations are not working.

Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently.


Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? 
Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic

Regards,
Zerah

On Wed, May 19, 2021 at 7:13 PM Zerah J <[hidden email]> wrote:
Thanks Dian. It worked for me

Regards,
Zerah

On Wed, May 19, 2021, 5:14 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes here>) ``` will create a Python object instead of Java object.

Regards,
Dian

2021年5月19日 下午5:23,Zerah J <[hidden email]> 写道:

Hi Dian,

Type of value_schema is <class 'avro.schema.RecordSchema'>

I have only a Json schema string and schema registry url. Please find below snippet :

import avro.schema

value_schema_str = """
    {          
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",  
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank" class="">http://host:port"



How can I create Java Schema object from this schema string and pass it from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

2021年5月19日 下午3:41,Zerah J <[hidden email]> 写道:

Hi Dian,

Thanks for your suggestion.

I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :

class MyAvroRowDeserializationSchema(DeserializationSchema):

    def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
            JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
                .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
            j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)

    super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)


FlinkKafkaConsumer is now invoked as below using MyAvroRowDeserializationSchema :

value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "<a href="http://host:port" rel="noreferrer" target="_blank" class="">http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
        topics='my_topic',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})

I'm getting the below error :

Traceback (most recent call last):
  File "flinkKafkaAvro.py", line 70, in datastream_api_demo
    deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
  File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
    j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
    args_command, temp_args = self._build_args(*args)
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
    [get_command_part(arg, self.pool) for arg in new_args])
  File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'




Please suggest how this method should be called. Here the schema used is avro schema.

Regards,
Zerah

On Mon, May 17, 2021 at 3:17 PM Dian Fu <[hidden email]> wrote:
Hi Zerah,

I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example.

Regards,
Dian

[1] https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303

> 2021年5月17日 下午5:35,Zerah J <[hidden email]> 写道:
>
> Hi,
>
> I have below use case
>
> 1. Read streaming data from Kafka topic using Flink Python API
> 2. Apply transformations on the data stream
> 3. Write back to different kafka topics based on the incoming data
>
> Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to deserialize the data.
>
> Please help to process the data as ConfluentRegistryAvroDeserializationSchema is not available in the Python API.
>
> Regards,
> Zerah