Hey Arvid,I found that It's a constant from Flink. https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L307I believe, it would be good to substitute 'record' to 'Record'What do you think?Thanks,YoungwooOn Mon, May 17, 2021 at 8:10 PM Arvid Heise <[hidden email]> wrote:Hi Youngwoo,You can try to use aliases for it [1].Even better would be to use a different name for the record. In general, since Avro originally comes from the Java World, it's more common to use camel case for record names.On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <[hidden email]> wrote:Hi,I have a table backed by confluent avro format and the generated schema from flink looks like following:{"type": "record","name": "record","fields": [{"name": "dt","type": ["null",{"type": "int","logicalType": "date"}],"default": null},(snip)}At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application:Traceback (most recent call last):
File "/usr/local/bin/datahub", line 8, in <module>
sys.exit(datahub())
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest
pipeline.run()
File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run
for wu in self.source.get_workunits():
File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits
mce = self._extract_record(t)
File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record
fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields
parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse
return SchemaFromJSONData(json_data, names)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData
return parser(json_data, names=names)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject
return RecordSchema(
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__
super(RecordSchema, self).__init__(
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__
names.Register(self)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register
raise SchemaParseException(
avro.schema.SchemaParseException: record is a reserved type name.
Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399
So, I wonder if I can set or change the name of avro record to avoid this parse exception.
Thanks,
Youngwoo
Free forum by Nabble | Edit this page |