Hi Dian,
Thanks for your suggestion.
I tried to invoke
ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below :
class MyAvroRowDeserializationSchema(DeserializationSchema):
def __init__(self, record_class: str = None, avro_schema_string: schema = None, url: str = None):
JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
.org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
super(MyAvroRowDeserializationSchema, self).__init__(j_deserialization_schema)
FlinkKafkaConsumer is now invoked as below using
MyAvroRowDeserializationSchema
:
value_schema = avro.schema.parse(<reader schema goes here>)
schema_url = "http://host:port"
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
kafka_source = FlinkKafkaConsumer(
topics='my_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'host:port', 'group.id': 'test_group'})
I'm getting the below error :
Traceback (most recent call last):
File "flinkKafkaAvro.py", line 70, in datastream_api_demo
deserialization_schema = MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
j_deserialization_schema = JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1277, in __call__
args_command, temp_args = self._build_args(*args)
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in _build_args
[get_command_part(arg, self.pool) for arg in new_args])
File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line 1247, in <listcomp>
[get_command_part(arg, self.pool) for arg in new_args])
File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 298, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
Please suggest how this method should be called. Here the schema used is avro schema.
Regards,
Zerah