TypeInfo issue with Avro SpecificRecord

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

TypeInfo issue with Avro SpecificRecord

Patrick Lucas-3
Hi,

I have read [1] when it comes to using Avro for serialization, but I'm stuck with a mysterious exception when Flink is doing type resolution. (Flink 1.13.1)

Basically, I'm able to use a SpecificRecord type in my source, but I am unable to use different SpecificRecord types later in the pipeline, getting an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2].

Let's say I have a schema "Foo" with one field "foo" of type "Bar", and schema "Bar" with one field "message" of type "string". My input data is a single Foo record of the form {"foo": {"message": "hi"}}.

This works:

    env.fromElements(myInput).print();

But this does not:

    env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print();

(nor does it work if I use a full MapFunction<Foo, Bar>)

Does anyone know what I might be running into here? If necessary, I can put together a full reproducing.


Thanks,
Patrick
Reply | Threaded
Open this post in threaded view
|

Re: TypeInfo issue with Avro SpecificRecord

Patrick Lucas-3
Alright, I figured it out—it's very similar to FLINK-13703, but instead of having to do with immutable fields, it's due to use of the Avro Gradle plugin option `gettersReturnOptional`.

With this option, the generated code uses Optional for getters, but it's particularly useful with the option `optionalGettersForNullableFieldsOnly`. The presence of Optional-returning getters causes Flink's POJO analyzer to return null.

I didn't run into this previously because I used both options never had nullable fields in my schemas!

I don't suppose this would be considered a bug, but I'll leave a comment on the above issue.

--
Patrick Lucas


On Mon, Jun 14, 2021 at 5:06 PM Patrick Lucas <[hidden email]> wrote:
Hi,

I have read [1] when it comes to using Avro for serialization, but I'm stuck with a mysterious exception when Flink is doing type resolution. (Flink 1.13.1)

Basically, I'm able to use a SpecificRecord type in my source, but I am unable to use different SpecificRecord types later in the pipeline, getting an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2].

Let's say I have a schema "Foo" with one field "foo" of type "Bar", and schema "Bar" with one field "message" of type "string". My input data is a single Foo record of the form {"foo": {"message": "hi"}}.

This works:

    env.fromElements(myInput).print();

But this does not:

    env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print();

(nor does it work if I use a full MapFunction<Foo, Bar>)

Does anyone know what I might be running into here? If necessary, I can put together a full reproducing.


Thanks,
Patrick
Reply | Threaded
Open this post in threaded view
|

Re: TypeInfo issue with Avro SpecificRecord

rmetzger0
Thanks a lot for sharing the solution on the mailing list and in the ticket. 

On Tue, Jun 15, 2021 at 11:52 AM Patrick Lucas <[hidden email]> wrote:
Alright, I figured it out—it's very similar to FLINK-13703, but instead of having to do with immutable fields, it's due to use of the Avro Gradle plugin option `gettersReturnOptional`.

With this option, the generated code uses Optional for getters, but it's particularly useful with the option `optionalGettersForNullableFieldsOnly`. The presence of Optional-returning getters causes Flink's POJO analyzer to return null.

I didn't run into this previously because I used both options never had nullable fields in my schemas!

I don't suppose this would be considered a bug, but I'll leave a comment on the above issue.

--
Patrick Lucas


On Mon, Jun 14, 2021 at 5:06 PM Patrick Lucas <[hidden email]> wrote:
Hi,

I have read [1] when it comes to using Avro for serialization, but I'm stuck with a mysterious exception when Flink is doing type resolution. (Flink 1.13.1)

Basically, I'm able to use a SpecificRecord type in my source, but I am unable to use different SpecificRecord types later in the pipeline, getting an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2].

Let's say I have a schema "Foo" with one field "foo" of type "Bar", and schema "Bar" with one field "message" of type "string". My input data is a single Foo record of the form {"foo": {"message": "hi"}}.

This works:

    env.fromElements(myInput).print();

But this does not:

    env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print();

(nor does it work if I use a full MapFunction<Foo, Bar>)

Does anyone know what I might be running into here? If necessary, I can put together a full reproducing.


Thanks,
Patrick