How does Flink SQL read Avro union?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How does Flink SQL read Avro union?

Vincent Dong
Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
    string num_iid;
    string has_showcase;
    string jdp_created;
  }

  record RefundRow {
    string refund_id;
    string status;
    string jdp_created;
  }

  record RowEvent {
    union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? Then I can `select status, refund_id from the_converted_table`.


Thanks
Vincent Dong


 

Reply | Threaded
Open this post in threaded view
|

Re: How does Flink SQL read Avro union?

Arvid Heise-4
Hi Vincent,

I'm not well into Flink SQL, so I'm pulling in Jark.

I have stopped using union records in your way and instead only use nullable fields (technically also a union field but much easier to handle in all languages).

So if you have a way to change the schema, maybe try it out:
  record RowEvent {
    union { null, ItemRow } item_row default null;
    union { null, RefundRow } refund_row default null;
  }



On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong <[hidden email]> wrote:
Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
    string num_iid;
    string has_showcase;
    string jdp_created;
  }

  record RefundRow {
    string refund_id;
    string status;
    string jdp_created;
  }

  record RowEvent {
    union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? Then I can `select status, refund_id from the_converted_table`.


Thanks
Vincent Dong


 

Reply | Threaded
Open this post in threaded view
|

Re:Re: How does Flink SQL read Avro union?

Vincent Dong
Hi Arvid,

I cannot decide the schema of the Kafka source topic since others also consume this topic.
I use Flink DataStream to consume the topic and then transform it to schema without union field in it, to avoid the Flink SQL issue.

Cheers,
Vincent

At 2021-03-22 22:04:53, "Arvid Heise" <[hidden email]> wrote:

Hi Vincent,

I'm not well into Flink SQL, so I'm pulling in Jark.

I have stopped using union records in your way and instead only use nullable fields (technically also a union field but much easier to handle in all languages).

So if you have a way to change the schema, maybe try it out:
  record RowEvent {
    union { null, ItemRow } item_row default null;
    union { null, RefundRow } refund_row default null;
  }



On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong <[hidden email]> wrote:
Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
    string num_iid;
    string has_showcase;
    string jdp_created;
  }

  record RefundRow {
    string refund_id;
    string status;
    string jdp_created;
  }

  record RowEvent {
    union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? Then I can `select status, refund_id from the_converted_table`.


Thanks
Vincent Dong


 



 

Reply | Threaded
Open this post in threaded view
|

Re: Re: How does Flink SQL read Avro union?

Arvid Heise-4
Hi Vincent,

yes if you cannot influence the schema, then there is little you can do on SQL level and your workaround is probably the only way to go.

However, I'd encourage you to speak with the other consumers/producers to find a way without unions. They are also ugly to use in all strongly typed languages (except Scala) and they are not forward and backward-compatible. Chances are high that the other users are not Avro experts and would benefit from these insights. I can guarantee you that it's just a matter of time until unions will break things and annoy the hell out of you (probably not in this topic, but if you do not establish best practices without unions, it will happen with another topic).

On Wed, Mar 31, 2021 at 11:54 AM Vincent Dong <[hidden email]> wrote:
Hi Arvid,

I cannot decide the schema of the Kafka source topic since others also consume this topic.
I use Flink DataStream to consume the topic and then transform it to schema without union field in it, to avoid the Flink SQL issue.

Cheers,
Vincent

At 2021-03-22 22:04:53, "Arvid Heise" <[hidden email]> wrote:

Hi Vincent,

I'm not well into Flink SQL, so I'm pulling in Jark.

I have stopped using union records in your way and instead only use nullable fields (technically also a union field but much easier to handle in all languages).

So if you have a way to change the schema, maybe try it out:
  record RowEvent {
    union { null, ItemRow } item_row default null;
    union { null, RefundRow } refund_row default null;
  }



On Thu, Mar 18, 2021 at 7:35 AM Vincent Dong <[hidden email]> wrote:
Hi All, 
  How does Flink SQL read Kafka Avro message which has union field?
  For me,  avro schema is defined as following, 
```
  record ItemRow {
    string num_iid;
    string has_showcase;
    string jdp_created;
  }

  record RefundRow {
    string refund_id;
    string status;
    string jdp_created;
  }

  record RowEvent {
    union { ItemRow, RefundRow } item_row;
  }
```
Now I'm sure that for a specific kafka topic, the item_row in all messages is RefundRow, but I don't know how to define source table and query the table. 
Can I define the table to force Flink SQL converts all messages to RefundRow? Then I can `select status, refund_id from the_converted_table`.


Thanks
Vincent Dong