Hi, I have a table backed by confluent avro format and the generated schema from flink looks like following: { "type": "record", "name": "record", "fields": [ { "name": "dt", "type": [ "null", { "type": "int", "logicalType": "date" } ], "default": null }, (snip) } At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application: Traceback (most recent call last): File "/usr/local/bin/datahub", line 8, in <module> sys.exit(datahub()) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest pipeline.run() File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run for wu in self.source.get_workunits(): File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits mce = self._extract_record(t) File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record fields = schema_util.avro_schema_to_mce_fields(schema.schema_str) File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string) File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse return SchemaFromJSONData(json_data, names) File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData return parser(json_data, names=names) File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject return RecordSchema( File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__ super(RecordSchema, self).__init__( File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__ names.Register(self) File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register raise SchemaParseException( avro.schema.SchemaParseException: record is a reserved type name. Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399 So, I wonder if I can set or change the name of avro record to avoid this parse exception. Thanks, Youngwoo |
Hi Youngwoo, You can try to use aliases for it [1]. Even better would be to use a different name for the record. In general, since Avro originally comes from the Java World, it's more common to use camel case for record names. On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <[hidden email]> wrote:
|
Hey Arvid, I found that It's a constant from Flink. https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L307 I believe, it would be good to substitute 'record' to 'Record' What do you think? Thanks, Youngwoo On Mon, May 17, 2021 at 8:10 PM Arvid Heise <[hidden email]> wrote:
|
Arvid, I found a jira related to my issue. https://issues.apache.org/jira/browse/FLINK-18096 Added a comment and I think Seth's idea is way better than just renaming the current name of the record from avro schema. Thanks, Youngwoo On Mon, May 17, 2021 at 8:37 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |