How to change the record name of avro schema

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to change the record name of avro schema

Youngwoo Kim (김영우)
Hi,

I have a table backed by confluent avro format and the generated schema from flink looks like following:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "dt",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "date"
        }
      ],
      "default": null
    },

(snip)

}

At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application:

Traceback (most recent call last):

  File "/usr/local/bin/datahub", line 8, in <module>

    sys.exit(datahub())

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__

    return self.main(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main

    rv = self.invoke(ctx)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke

    return _process_result(sub_ctx.command.invoke(sub_ctx))

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke

    return ctx.invoke(self.callback, **ctx.params)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke

    return callback(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest

    pipeline.run()

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run

    for wu in self.source.get_workunits():

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits

    mce = self._extract_record(t)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record

    fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields

    parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse

    return SchemaFromJSONData(json_data, names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData

    return parser(json_data, names=names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject

    return RecordSchema(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__

    super(RecordSchema, self).__init__(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__

    names.Register(self)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register

    raise SchemaParseException(

avro.schema.SchemaParseException: record is a reserved type name.



Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399


So, I wonder if I can set or change the name of avro record to avoid this parse exception.


Thanks,

Youngwoo

Reply | Threaded
Open this post in threaded view
|

Re: How to change the record name of avro schema

Arvid Heise-4
Hi Youngwoo,

You can try to use aliases for it [1].

Even better would be to use a different name for the record. In general, since Avro originally comes from the Java World, it's more common to use camel case for record names.


On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hi,

I have a table backed by confluent avro format and the generated schema from flink looks like following:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "dt",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "date"
        }
      ],
      "default": null
    },

(snip)

}

At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application:

Traceback (most recent call last):

  File "/usr/local/bin/datahub", line 8, in <module>

    sys.exit(datahub())

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__

    return self.main(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main

    rv = self.invoke(ctx)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke

    return _process_result(sub_ctx.command.invoke(sub_ctx))

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke

    return ctx.invoke(self.callback, **ctx.params)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke

    return callback(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest

    pipeline.run()

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run

    for wu in self.source.get_workunits():

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits

    mce = self._extract_record(t)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record

    fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields

    parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse

    return SchemaFromJSONData(json_data, names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData

    return parser(json_data, names=names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject

    return RecordSchema(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__

    super(RecordSchema, self).__init__(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__

    names.Register(self)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register

    raise SchemaParseException(

avro.schema.SchemaParseException: record is a reserved type name.



Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399


So, I wonder if I can set or change the name of avro record to avoid this parse exception.


Thanks,

Youngwoo

Reply | Threaded
Open this post in threaded view
|

Re: How to change the record name of avro schema

Youngwoo Kim (김영우)-2
Hey Arvid,

I believe, it would be good to substitute 'record' to 'Record'

What do you think?

Thanks,
Youngwoo


On Mon, May 17, 2021 at 8:10 PM Arvid Heise <[hidden email]> wrote:
Hi Youngwoo,

You can try to use aliases for it [1].

Even better would be to use a different name for the record. In general, since Avro originally comes from the Java World, it's more common to use camel case for record names.


On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hi,

I have a table backed by confluent avro format and the generated schema from flink looks like following:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "dt",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "date"
        }
      ],
      "default": null
    },

(snip)

}

At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application:

Traceback (most recent call last):

  File "/usr/local/bin/datahub", line 8, in <module>

    sys.exit(datahub())

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__

    return self.main(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main

    rv = self.invoke(ctx)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke

    return _process_result(sub_ctx.command.invoke(sub_ctx))

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke

    return ctx.invoke(self.callback, **ctx.params)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke

    return callback(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest

    pipeline.run()

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run

    for wu in self.source.get_workunits():

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits

    mce = self._extract_record(t)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record

    fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields

    parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse

    return SchemaFromJSONData(json_data, names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData

    return parser(json_data, names=names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject

    return RecordSchema(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__

    super(RecordSchema, self).__init__(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__

    names.Register(self)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register

    raise SchemaParseException(

avro.schema.SchemaParseException: record is a reserved type name.



Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399


So, I wonder if I can set or change the name of avro record to avoid this parse exception.


Thanks,

Youngwoo

Reply | Threaded
Open this post in threaded view
|

Re: How to change the record name of avro schema

Youngwoo Kim (김영우)
Arvid,

I found a jira related to my issue. https://issues.apache.org/jira/browse/FLINK-18096
Added a comment and I think Seth's idea is way better than just renaming the current name of the record from avro schema.

Thanks,
Youngwoo



On Mon, May 17, 2021 at 8:37 PM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hey Arvid,

I believe, it would be good to substitute 'record' to 'Record'

What do you think?

Thanks,
Youngwoo


On Mon, May 17, 2021 at 8:10 PM Arvid Heise <[hidden email]> wrote:
Hi Youngwoo,

You can try to use aliases for it [1].

Even better would be to use a different name for the record. In general, since Avro originally comes from the Java World, it's more common to use camel case for record names.


On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <[hidden email]> wrote:
Hi,

I have a table backed by confluent avro format and the generated schema from flink looks like following:

{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "dt",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "date"
        }
      ],
      "default": null
    },

(snip)

}

At this moment, I have another application that reads avro schema from schema registry. Unfortunately, Got a traceback from the application:

Traceback (most recent call last):

  File "/usr/local/bin/datahub", line 8, in <module>

    sys.exit(datahub())

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__

    return self.main(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main

    rv = self.invoke(ctx)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke

    return _process_result(sub_ctx.command.invoke(sub_ctx))

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke

    return ctx.invoke(self.callback, **ctx.params)

  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke

    return callback(*args, **kwargs)

  File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 74, in ingest

    pipeline.run()

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run

    for wu in self.source.get_workunits():

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 79, in get_workunits

    mce = self._extract_record(t)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", line 112, in _extract_record

    fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)

  File "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", line 117, in avro_schema_to_mce_fields

    parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in parse

    return SchemaFromJSONData(json_data, names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in SchemaFromJSONData

    return parser(json_data, names=names)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in _SchemaFromJSONObject

    return RecordSchema(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in __init__

    super(RecordSchema, self).__init__(

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in __init__

    names.Register(self)

  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in Register

    raise SchemaParseException(

avro.schema.SchemaParseException: record is a reserved type name.



Full name of schema's record is `"name": "record",` but the `record` is one of avro complex type. See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399


So, I wonder if I can set or change the name of avro record to avoid this parse exception.


Thanks,

Youngwoo