Flink + Avro GenericRecord - first field value overwrites all other fields

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink + Avro GenericRecord - first field value overwrites all other fields

Tarandeep Singh
Hi,

I am using DataSet API and reading Avro files as DataSet<GenericRecord>. I am seeing this weird behavior that record is read correctly from file (verified by printing all values) but when when this record is passed to Flink chain/DAG (e.g. KeySelector), every field in this record has the same value as the first field value. Even more weird is they values are of different types, e.g. I have a record Query with two fields key (integer) and query (String). When the record was read from file, correct values were read (e.g. 100, "apache flink"). But when I print/check values in KeySelector, I get (100, 100).

I saw similar post on stackoverflow-
http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response

Any idea what might be happening?
Any workaround will be greatly appreciated.

Thank you,
Tarandeep

Reply | Threaded
Open this post in threaded view
|

Re: Flink + Avro GenericRecord - first field value overwrites all other fields

Tarandeep Singh
I think I found a workaround. Instead of reading Avro files as GenericRecords, if I read them as specific records and then use a map to convert (typecast) them as GenericRecord, the problem goes away.

I ran some tests and so far this workaround seems to be working in my local setup.

-Tarandeep

On Wed, May 11, 2016 at 10:24 PM, Tarandeep Singh <[hidden email]> wrote:
Hi,

I am using DataSet API and reading Avro files as DataSet<GenericRecord>. I am seeing this weird behavior that record is read correctly from file (verified by printing all values) but when when this record is passed to Flink chain/DAG (e.g. KeySelector), every field in this record has the same value as the first field value. Even more weird is they values are of different types, e.g. I have a record Query with two fields key (integer) and query (String). When the record was read from file, correct values were read (e.g. 100, "apache flink"). But when I print/check values in KeySelector, I get (100, 100).

I saw similar post on stackoverflow-
http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response

Any idea what might be happening?
Any workaround will be greatly appreciated.

Thank you,
Tarandeep


Reply | Threaded
Open this post in threaded view
|

Re: Flink + Avro GenericRecord - first field value overwrites all other fields

Fabian Hueske-2
Hi Tarandeep,

the AvroInputFormat was recently extended to support GenericRecords. [1]
You could also try to run the latest SNAPSHOT version and see if it works for you.

Cheers, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3691

2016-05-12 10:05 GMT+02:00 Tarandeep Singh <[hidden email]>:
I think I found a workaround. Instead of reading Avro files as GenericRecords, if I read them as specific records and then use a map to convert (typecast) them as GenericRecord, the problem goes away.

I ran some tests and so far this workaround seems to be working in my local setup.

-Tarandeep

On Wed, May 11, 2016 at 10:24 PM, Tarandeep Singh <[hidden email]> wrote:
Hi,

I am using DataSet API and reading Avro files as DataSet<GenericRecord>. I am seeing this weird behavior that record is read correctly from file (verified by printing all values) but when when this record is passed to Flink chain/DAG (e.g. KeySelector), every field in this record has the same value as the first field value. Even more weird is they values are of different types, e.g. I have a record Query with two fields key (integer) and query (String). When the record was read from file, correct values were read (e.g. 100, "apache flink"). But when I print/check values in KeySelector, I get (100, 100).

I saw similar post on stackoverflow-
http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response

Any idea what might be happening?
Any workaround will be greatly appreciated.

Thank you,
Tarandeep