Table API: Joining on Tables of Complex Types

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Table API: Joining on Tables of Complex Types

Hailu, Andreas

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Table API: Joining on Tables of Complex Types

Dawid Wysakowicz-2

Hi Andreas,

First of all I would highly recommend converting a non-structured types to structured types as soon as possible as it opens more possibilities to optimize the plan.

Have you tried:

Table users = batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) as userName", "f0")
Table other = batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid

On 03/01/2020 23:16, Hailu, Andreas wrote:

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Table API: Joining on Tables of Complex Types

Hailu, Andreas

Hi David, thanks for getting back.

 

From what you’ve said, I think we’ll need to convert our GenericRecord into structured types – do you have any references or examples I can have a look at? If not, perhaps you could just show me a basic example of flattening a complex object with accessors into a Table of structured types. Or by structured types, did you mean Row?

 

// ah

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Cc: Richards, Adam S [Engineering] <[hidden email]>
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

First of all I would highly recommend converting a non-structured types to structured types as soon as possible as it opens more possibilities to optimize the plan.

Have you tried:

Table users = batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) as userName", "f0")
Table other = batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid

On 03/01/2020 23:16, Hailu, Andreas wrote:

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Table API: Joining on Tables of Complex Types

Dawid Wysakowicz-2

Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest option. You can check how its done in the org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the logic from there to write something like:

    DataSet<GenericRecord> dataset = ...
    dataset.map( /* convert GenericRecord to Row */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen as an avro type by fink (flink should understand that avro type is a complex type):

    dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and flatten it automatically when registering the Table. Bear in mind the returns method is part of SingleInputUdfOperator so you can apply it right after some transformation e.g. map/flatMap etc.

Best,

Dawid


On 06/01/2020 18:03, Hailu, Andreas wrote:

Hi David, thanks for getting back.

 

From what you’ve said, I think we’ll need to convert our GenericRecord into structured types – do you have any references or examples I can have a look at? If not, perhaps you could just show me a basic example of flattening a complex object with accessors into a Table of structured types. Or by structured types, did you mean Row?

 

// ah

 

From: Dawid Wysakowicz [hidden email]
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] [hidden email]; [hidden email]
Cc: Richards, Adam S [Engineering] [hidden email]
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

First of all I would highly recommend converting a non-structured types to structured types as soon as possible as it opens more possibilities to optimize the plan.

Have you tried:

Table users = batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) as userName", "f0")
Table other = batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid

On 03/01/2020 23:16, Hailu, Andreas wrote:

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Table API: Joining on Tables of Complex Types

Hailu, Andreas

Very well – I’ll give this a try. Thanks, Dawid.

 

// ah

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Wednesday, January 8, 2020 7:21 AM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Cc: Richards, Adam S [Engineering] <[hidden email]>
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest option. You can check how its done in the org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the logic from there to write something like:

    DataSet<GenericRecord> dataset = ...
    dataset.map( /* convert GenericRecord to Row */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen as an avro type by fink (flink should understand that avro type is a complex type):

    dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and flatten it automatically when registering the Table. Bear in mind the returns method is part of SingleInputUdfOperator so you can apply it right after some transformation e.g. map/flatMap etc.

Best,

Dawid

 

On 06/01/2020 18:03, Hailu, Andreas wrote:

Hi David, thanks for getting back.

 

From what you’ve said, I think we’ll need to convert our GenericRecord into structured types – do you have any references or examples I can have a look at? If not, perhaps you could just show me a basic example of flattening a complex object with accessors into a Table of structured types. Or by structured types, did you mean Row?

 

// ah

 

From: Dawid Wysakowicz [hidden email]
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] [hidden email]; [hidden email]
Cc: Richards, Adam S [Engineering] [hidden email]
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

First of all I would highly recommend converting a non-structured types to structured types as soon as possible as it opens more possibilities to optimize the plan.

Have you tried:

Table users = batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) as userName", "f0")
Table other = batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid

On 03/01/2020 23:16, Hailu, Andreas wrote:

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

RE: Table API: Joining on Tables of Complex Types

Hailu, Andreas
In reply to this post by Dawid Wysakowicz-2

Dawid, this approach looks promising. I’m able to flatten out my Avro records into Rows and run simple queries atop of them. I’ve got a question – when I register my Rows as a table, I see the following log providing a warning:

 

2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class org.apache.flink.types.Row does not contain a getter for field fields

2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class org.apache.flink.types.Row does not contain a setter for field fields

2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class org.apache.flink.types.Row cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

 

Will this be problematic even now that we’ve provided TypeInfos for the Rows? Performance is something that I’m concerned about as I’ve already introduced a new operation to transform our records to Rows.

 

// ah

 

From: Hailu, Andreas [Engineering]
Sent: Wednesday, January 8, 2020 12:08 PM
To: 'Dawid Wysakowicz' <[hidden email]>; [hidden email]
Cc: Richards, Adam S [Engineering] <[hidden email]>
Subject: RE: Table API: Joining on Tables of Complex Types

 

Very well – I’ll give this a try. Thanks, Dawid.

 

// ah

 

From: Dawid Wysakowicz <[hidden email]>
Sent: Wednesday, January 8, 2020 7:21 AM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Cc: Richards, Adam S [Engineering] <[hidden email]>
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest option. You can check how its done in the org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can reuse the logic from there to write something like:

    DataSet<GenericRecord> dataset = ...
    dataset.map( /* convert GenericRecord to Row */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen as an avro type by fink (flink should understand that avro type is a complex type):

    dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and flatten it automatically when registering the Table. Bear in mind the returns method is part of SingleInputUdfOperator so you can apply it right after some transformation e.g. map/flatMap etc.

Best,

Dawid

 

On 06/01/2020 18:03, Hailu, Andreas wrote:

Hi David, thanks for getting back.

 

From what you’ve said, I think we’ll need to convert our GenericRecord into structured types – do you have any references or examples I can have a look at? If not, perhaps you could just show me a basic example of flattening a complex object with accessors into a Table of structured types. Or by structured types, did you mean Row?

 

// ah

 

From: Dawid Wysakowicz [hidden email]
Sent: Monday, January 6, 2020 9:32 AM
To: Hailu, Andreas [Engineering] [hidden email]; [hidden email]
Cc: Richards, Adam S [Engineering] [hidden email]
Subject: Re: Table API: Joining on Tables of Complex Types

 

Hi Andreas,

First of all I would highly recommend converting a non-structured types to structured types as soon as possible as it opens more possibilities to optimize the plan.

Have you tried:

Table users = batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0, userName) as userName", "f0")
Table other = batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0, userName) as user", "f1")

Table result = other.join(users, "user = userName")

You could also check how the org.apache.flink.formats.avro.AvroRowDeserializationSchema class is implemented which internally converts an avro record to a structured Row.

Hope this helps.

Best,

Dawid

On 03/01/2020 23:16, Hailu, Andreas wrote:

Hi folks,

 

I’m trying to join two Tables which are composed of complex types, Avro’s GenericRecord to be exact. I have to use a custom UDF to extract fields out of the record and I’m having some trouble on how to do joins on them as I need to call this UDF to read what I need. Example below:

 

batchTableEnvironment.registerFunction("getField", new GRFieldExtractor()); // GenericRecord field extractor

Table users = batchTableEnvironment.fromDataSet(usersDataset); // Converting from some pre-existing DataSet

Table otherDataset = batchTableEnvironment.fromDataSet(someOtherDataset);

Table userNames = t.select("getField(f0, userName)"); // This is how the UDF is used, as GenericRecord is a complex type requiring you to invoke a get() method on the field you’re interested in. Here we get a get on field ‘userName’

 

I’d like to do something using the Table API similar to the query “SELECT * from otherDataset WHERE otherDataset.userName = users.userName”. How is this done?

 

Best,

Andreas

 

The Goldman Sachs Group, Inc. All rights reserved.

See http://www.gs.com/disclaimer/global_email for important risk disclosures, conflicts of interest and other terms and conditions relating to this e-mail and your reliance on information contained in it.  This message may contain confidential or privileged information.  If you are not the intended recipient, please advise us immediately and delete this message.  See http://www.gs.com/disclaimer/email for further information on confidentiality and the risks of non-secure electronic communication.  If you cannot access these links, please notify us by reply message and we will send the contents to you.

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Table API: Joining on Tables of Complex Types

Timo Walther
Hi Andreas,

if dataset.getType() returns a RowTypeInfo you can ignore this log
message. The type extractor runs before the ".returns()" but with this
method you override the old type.

Regards,
Timo


On 15.01.20 15:27, Hailu, Andreas wrote:

> Dawid, this approach looks promising. I’m able to flatten out my Avro
> records into Rows and run simple queries atop of them. I’ve got a
> question – when I register my Rows as a table, I see the following log
> providing a warning:
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a getter for field fields/
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a setter for field fields/
>
> /2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class
> org.apache.flink.types.Row cannot be used as a POJO type because not all
> fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization" for
> details of the effect on performance./
>
> Will this be problematic even now that we’ve provided TypeInfos for the
> Rows? Performance is something that I’m concerned about as I’ve already
> introduced a new operation to transform our records to Rows.
>
> *// *ah**
>
> *From:* Hailu, Andreas [Engineering]
> *Sent:* Wednesday, January 8, 2020 12:08 PM
> *To:* 'Dawid Wysakowicz' <[hidden email]>; [hidden email]
> *Cc:* Richards, Adam S [Engineering] <[hidden email]>
> *Subject:* RE: Table API: Joining on Tables of Complex Types
>
> Very well – I’ll give this a try. Thanks, Dawid.
>
> *// *ah**
>
> *From:* Dawid Wysakowicz <[hidden email]
> <mailto:[hidden email]>>
> *Sent:* Wednesday, January 8, 2020 7:21 AM
> *To:* Hailu, Andreas [Engineering] <[hidden email]
> <mailto:[hidden email]>>; [hidden email]
> <mailto:[hidden email]>
> *Cc:* Richards, Adam S [Engineering] <[hidden email]
> <mailto:[hidden email]>>
> *Subject:* Re: Table API: Joining on Tables of Complex Types
>
> Hi Andreas,
>
> Converting your GenericRecords to Rows would definitely be the safest
> option. You can check how its done in the
> org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
> reuse the logic from there to write something like:
>
>      DataSet<GenericRecord> dataset = ...
>
>      dataset.map( /* convert GenericRecord to Row */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));
>
> Another thing you could try is to make sure that GenericRecord is seen
> as an avro type by fink (flink should understand that avro type is a
> complex type):
>
>      dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)
>
> than the TableEnvironment should pick it up as a structured type and
> flatten it automatically when registering the Table. Bear in mind the
> returns method is part of SingleInputUdfOperator so you can apply it
> right after some transformation e.g. map/flatMap etc.
>
> Best,
>
> Dawid
>
> On 06/01/2020 18:03, Hailu, Andreas wrote:
>
>     Hi David, thanks for getting back.
>
>      From what you’ve said, I think we’ll need to convert our
>     GenericRecord into structured types – do you have any references or
>     examples I can have a look at? If not, perhaps you could just show
>     me a basic example of flattening a complex object with accessors
>     into a Table of structured types. Or by structured types, did you
>     mean Row?
>
>     *// *ah
>
>     *From:* Dawid Wysakowicz <[hidden email]>
>     <mailto:[hidden email]>
>     *Sent:* Monday, January 6, 2020 9:32 AM
>     *To:* Hailu, Andreas [Engineering] <[hidden email]>
>     <mailto:[hidden email]>; [hidden email]
>     <mailto:[hidden email]>
>     *Cc:* Richards, Adam S [Engineering] <[hidden email]>
>     <mailto:[hidden email]>
>     *Subject:* Re: Table API: Joining on Tables of Complex Types
>
>     Hi Andreas,
>
>     First of all I would highly recommend converting a non-structured
>     types to structured types as soon as possible as it opens more
>     possibilities to optimize the plan.
>
>     Have you tried:
>
>     Table users =
>     batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0,
>     userName) as userName", "f0")
>     Table other =
>     batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0,
>     userName) as user", "f1")
>
>     Table result = other.join(users, "user = userName")
>
>     You could also check how the
>     org.apache.flink.formats.avro.AvroRowDeserializationSchema class is
>     implemented which internally converts an avro record to a structured
>     Row.
>
>     Hope this helps.
>
>     Best,
>
>     Dawid
>
>     On 03/01/2020 23:16, Hailu, Andreas wrote:
>
>         Hi folks,
>
>         I’m trying to join two Tables which are composed of complex
>         types, Avro’s GenericRecord to be exact. I have to use a custom
>         UDF to extract fields out of the record and I’m having some
>         trouble on how to do joins on them as I need to call this UDF to
>         read what I need. Example below:
>
>         batchTableEnvironment.registerFunction("getField", new
>         GRFieldExtractor()); // GenericRecord field extractor
>
>         Table users = batchTableEnvironment.fromDataSet(usersDataset);
>         // Converting from some pre-existing DataSet
>
>         Table otherDataset =
>         batchTableEnvironment.fromDataSet(someOtherDataset);
>
>         Table userNames = t.select("getField(f0, userName)"); // This is
>         how the UDF is used, as GenericRecord is a complex type
>         requiring you to invoke a get() method on the field you’re
>         interested in. Here we get a get on field ‘userName’
>
>         I’d like to do something using the Table API similar to the
>         query “SELECT * from otherDataset WHERE otherDataset.userName =
>         users.userName”. How is this done?
>
>         Best,
>
>         Andreas
>
>         *The Goldman Sachs Group, Inc. All rights reserved*.
>
>         See http://www.gs.com/disclaimer/global_email for important risk
>         disclosures, conflicts of interest and other terms and
>         conditions relating to this e-mail and your reliance on
>         information contained in it.  This message may contain
>         confidential or privileged information.  If you are not the
>         intended recipient, please advise us immediately and delete this
>         message.  See http://www.gs.com/disclaimer/email for further
>         information on confidentiality and the risks of non-secure
>         electronic communication.  If you cannot access these links,
>         please notify us by reply message and we will send the contents
>         to you.
>
>         ------------------------------------------------------------------------
>
>
>         Your Personal Data: We may collect and process information about
>         you that may be subject to data protection laws. For more
>         information about how we use and disclose your personal data,
>         how we protect your information, our legal basis to use your
>         information, your rights and who you can contact, please refer
>         to: www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>
>
>     ------------------------------------------------------------------------
>
>
>     Your Personal Data: We may collect and process information about you
>     that may be subject to data protection laws. For more information
>     about how we use and disclose your personal data, how we protect
>     your information, our legal basis to use your information, your
>     rights and who you can contact, please refer to:
>     www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>
>
>
> ------------------------------------------------------------------------
>
> Your Personal Data: We may collect and process information about you
> that may be subject to data protection laws. For more information about
> how we use and disclose your personal data, how we protect your
> information, our legal basis to use your information, your rights and
> who you can contact, please refer to: www.gs.com/privacy-notices
> <http://www.gs.com/privacy-notices>

Reply | Threaded
Open this post in threaded view
|

RE: Table API: Joining on Tables of Complex Types

Hailu, Andreas
Hi Timo, Dawid,

This was very helpful - thanks! The Row type seems to only support getting fields by their index. Is there a way to get a field by its name like the Row class in Spark? Link: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Row.html#getAs(java.lang.String)

Our use case is that we're developing a data-processing library for developers leveraging our system to refine existing datasets and produce new ones. The flow is as follows:

Our library reads Avro/Parquet GenericRecord data files from a source and turns it into a Table --> users write a series of operations on this Table to create a new resulting Table--> resulting Table is then transformed persisted back to the file system as Avro GenericRecords in Avro/Parquet file.

We can map the Row field names to their corresponding indexes by patching the AvroRowDeserializationSchema class, but it's the step where we handle expose the Table to our users and then try and persist which will end up in this metadata loss. We know what fields the Table must be composed of, but we just won't know which index they live in so Row#getField() isn't what quite what we need.

// ah

-----Original Message-----
From: Timo Walther <[hidden email]>
Sent: Friday, January 17, 2020 11:29 AM
To: [hidden email]
Subject: Re: Table API: Joining on Tables of Complex Types

Hi Andreas,

if dataset.getType() returns a RowTypeInfo you can ignore this log message. The type extractor runs before the ".returns()" but with this method you override the old type.

Regards,
Timo


On 15.01.20 15:27, Hailu, Andreas wrote:

> Dawid, this approach looks promising. I'm able to flatten out my Avro
> records into Rows and run simple queries atop of them. I've got a
> question - when I register my Rows as a table, I see the following log
> providing a warning:
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a getter for field fields/
>
> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
> org.apache.flink.types.Row does not contain a setter for field fields/
>
> /2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class
> org.apache.flink.types.Row cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization"
> for details of the effect on performance./
>
> Will this be problematic even now that we've provided TypeInfos for
> the Rows? Performance is something that I'm concerned about as I've
> already introduced a new operation to transform our records to Rows.
>
> *// *ah**
>
> *From:* Hailu, Andreas [Engineering]
> *Sent:* Wednesday, January 8, 2020 12:08 PM
> *To:* 'Dawid Wysakowicz' <mailto:[hidden email]>;
> mailto:[hidden email]
> *Cc:* Richards, Adam S [Engineering] <mailto:[hidden email]>
> *Subject:* RE: Table API: Joining on Tables of Complex Types
>
> Very well - I'll give this a try. Thanks, Dawid.
>
> *// *ah**
>
> *From:* Dawid Wysakowicz <[hidden email]
> <mailto:[hidden email]>>
> *Sent:* Wednesday, January 8, 2020 7:21 AM
> *To:* Hailu, Andreas [Engineering] <[hidden email]
> <mailto:[hidden email]>>; mailto:[hidden email]
> <mailto:[hidden email]>
> *Cc:* Richards, Adam S [Engineering] <[hidden email]
> <mailto:[hidden email]>>
> *Subject:* Re: Table API: Joining on Tables of Complex Types
>
> Hi Andreas,
>
> Converting your GenericRecords to Rows would definitely be the safest
> option. You can check how its done in the
> org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
> reuse the logic from there to write something like:
>
>      DataSet<GenericRecord> dataset = ...
>
>      dataset.map( /* convert GenericRecord to Row
> */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));
>
> Another thing you could try is to make sure that GenericRecord is seen
> as an avro type by fink (flink should understand that avro type is a
> complex type):
>
>      dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)
>
> than the TableEnvironment should pick it up as a structured type and
> flatten it automatically when registering the Table. Bear in mind the
> returns method is part of SingleInputUdfOperator so you can apply it
> right after some transformation e.g. map/flatMap etc.
>
> Best,
>
> Dawid
>
> On 06/01/2020 18:03, Hailu, Andreas wrote:
>
>     Hi David, thanks for getting back.
>
>      From what you've said, I think we'll need to convert our
>     GenericRecord into structured types - do you have any references or
>     examples I can have a look at? If not, perhaps you could just show
>     me a basic example of flattening a complex object with accessors
>     into a Table of structured types. Or by structured types, did you
>     mean Row?
>
>     *// *ah
>
>     *From:* Dawid Wysakowicz <mailto:[hidden email]>
>     <mailto:[hidden email]>
>     *Sent:* Monday, January 6, 2020 9:32 AM
>     *To:* Hailu, Andreas [Engineering] <mailto:[hidden email]>
>     <mailto:[hidden email]>; mailto:[hidden email]
>     <mailto:[hidden email]>
>     *Cc:* Richards, Adam S [Engineering] <mailto:[hidden email]>
>     <mailto:[hidden email]>
>     *Subject:* Re: Table API: Joining on Tables of Complex Types
>
>     Hi Andreas,
>
>     First of all I would highly recommend converting a non-structured
>     types to structured types as soon as possible as it opens more
>     possibilities to optimize the plan.
>
>     Have you tried:
>
>     Table users =
>     batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0,
>     userName) as userName", "f0")
>     Table other =
>     batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0,
>     userName) as user", "f1")
>
>     Table result = other.join(users, "user = userName")
>
>     You could also check how the
>     org.apache.flink.formats.avro.AvroRowDeserializationSchema class is
>     implemented which internally converts an avro record to a structured
>     Row.
>
>     Hope this helps.
>
>     Best,
>
>     Dawid
>
>     On 03/01/2020 23:16, Hailu, Andreas wrote:
>
>         Hi folks,
>
>         I'm trying to join two Tables which are composed of complex
>         types, Avro's GenericRecord to be exact. I have to use a custom
>         UDF to extract fields out of the record and I'm having some
>         trouble on how to do joins on them as I need to call this UDF to
>         read what I need. Example below:
>
>         batchTableEnvironment.registerFunction("getField", new
>         GRFieldExtractor()); // GenericRecord field extractor
>
>         Table users = batchTableEnvironment.fromDataSet(usersDataset);
>         // Converting from some pre-existing DataSet
>
>         Table otherDataset =
>         batchTableEnvironment.fromDataSet(someOtherDataset);
>
>         Table userNames = t.select("getField(f0, userName)"); // This is
>         how the UDF is used, as GenericRecord is a complex type
>         requiring you to invoke a get() method on the field you're
>         interested in. Here we get a get on field 'userName'
>
>         I'd like to do something using the Table API similar to the
>         query "SELECT * from otherDataset WHERE otherDataset.userName =
>         users.userName". How is this done?
>
>         Best,
>
>         Andreas
>
>         *The Goldman Sachs Group, Inc. All rights reserved*.
>
>         See http://www.gs.com/disclaimer/global_email for important risk
>         disclosures, conflicts of interest and other terms and
>         conditions relating to this e-mail and your reliance on
>         information contained in it.  This message may contain
>         confidential or privileged information.  If you are not the
>         intended recipient, please advise us immediately and delete this
>         message.  See http://www.gs.com/disclaimer/email for further
>         information on confidentiality and the risks of non-secure
>         electronic communication.  If you cannot access these links,
>         please notify us by reply message and we will send the contents
>         to you.
>
>
> ----------------------------------------------------------------------
> --
>
>
>         Your Personal Data: We may collect and process information about
>         you that may be subject to data protection laws. For more
>         information about how we use and disclose your personal data,
>         how we protect your information, our legal basis to use your
>         information, your rights and who you can contact, please refer
>         to: http://www.gs.com/privacy-notices
> <http://www.gs.com/privacy-notices>
>
>
> ----------------------------------------------------------------------
> --
>
>
>     Your Personal Data: We may collect and process information about you
>     that may be subject to data protection laws. For more information
>     about how we use and disclose your personal data, how we protect
>     your information, our legal basis to use your information, your
>     rights and who you can contact, please refer to:
>     http://www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>
>
>
> ----------------------------------------------------------------------
> --
>
> Your Personal Data: We may collect and process information about you
> that may be subject to data protection laws. For more information
> about how we use and disclose your personal data, how we protect your
> information, our legal basis to use your information, your rights and
> who you can contact, please refer to: http://www.gs.com/privacy-notices
> <http://www.gs.com/privacy-notices>


________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
Reply | Threaded
Open this post in threaded view
|

Re: Table API: Joining on Tables of Complex Types

Timo Walther
Hi Andreas,

you are right, currently the Row type only supports accessing fields by
index. Usually, we recommend to fully work in Table API. There you can
access structured type fields by name (`SELECT row.field.field` or
`'row.get("field").get("field")`) and additional utilities such as
`flatten()`.

Can't you just use the schema of the table to as a helper for bridging
the names to indices?

Regards,
Timo


On 14.02.20 18:41, Hailu, Andreas wrote:

> Hi Timo, Dawid,
>
> This was very helpful - thanks! The Row type seems to only support getting fields by their index. Is there a way to get a field by its name like the Row class in Spark? Link: https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Row.html#getAs(java.lang.String)
>
> Our use case is that we're developing a data-processing library for developers leveraging our system to refine existing datasets and produce new ones. The flow is as follows:
>
> Our library reads Avro/Parquet GenericRecord data files from a source and turns it into a Table --> users write a series of operations on this Table to create a new resulting Table--> resulting Table is then transformed persisted back to the file system as Avro GenericRecords in Avro/Parquet file.
>
> We can map the Row field names to their corresponding indexes by patching the AvroRowDeserializationSchema class, but it's the step where we handle expose the Table to our users and then try and persist which will end up in this metadata loss. We know what fields the Table must be composed of, but we just won't know which index they live in so Row#getField() isn't what quite what we need.
>
> // ah
>
> -----Original Message-----
> From: Timo Walther <[hidden email]>
> Sent: Friday, January 17, 2020 11:29 AM
> To: [hidden email]
> Subject: Re: Table API: Joining on Tables of Complex Types
>
> Hi Andreas,
>
> if dataset.getType() returns a RowTypeInfo you can ignore this log message. The type extractor runs before the ".returns()" but with this method you override the old type.
>
> Regards,
> Timo
>
>
> On 15.01.20 15:27, Hailu, Andreas wrote:
>> Dawid, this approach looks promising. I'm able to flatten out my Avro
>> records into Rows and run simple queries atop of them. I've got a
>> question - when I register my Rows as a table, I see the following log
>> providing a warning:
>>
>> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
>> org.apache.flink.types.Row does not contain a getter for field fields/
>>
>> /2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
>> org.apache.flink.types.Row does not contain a setter for field fields/
>>
>> /2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class
>> org.apache.flink.types.Row cannot be used as a POJO type because not
>> all fields are valid POJO fields, and must be processed as GenericType.
>> Please read the Flink documentation on "Data Types & Serialization"
>> for details of the effect on performance./
>>
>> Will this be problematic even now that we've provided TypeInfos for
>> the Rows? Performance is something that I'm concerned about as I've
>> already introduced a new operation to transform our records to Rows.
>>
>> *// *ah**
>>
>> *From:* Hailu, Andreas [Engineering]
>> *Sent:* Wednesday, January 8, 2020 12:08 PM
>> *To:* 'Dawid Wysakowicz' <mailto:[hidden email]>;
>> mailto:[hidden email]
>> *Cc:* Richards, Adam S [Engineering] <mailto:[hidden email]>
>> *Subject:* RE: Table API: Joining on Tables of Complex Types
>>
>> Very well - I'll give this a try. Thanks, Dawid.
>>
>> *// *ah**
>>
>> *From:* Dawid Wysakowicz <[hidden email]
>> <mailto:[hidden email]>>
>> *Sent:* Wednesday, January 8, 2020 7:21 AM
>> *To:* Hailu, Andreas [Engineering] <[hidden email]
>> <mailto:[hidden email]>>; mailto:[hidden email]
>> <mailto:[hidden email]>
>> *Cc:* Richards, Adam S [Engineering] <[hidden email]
>> <mailto:[hidden email]>>
>> *Subject:* Re: Table API: Joining on Tables of Complex Types
>>
>> Hi Andreas,
>>
>> Converting your GenericRecords to Rows would definitely be the safest
>> option. You can check how its done in the
>> org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
>> reuse the logic from there to write something like:
>>
>>       DataSet<GenericRecord> dataset = ...
>>
>>       dataset.map( /* convert GenericRecord to Row
>> */).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));
>>
>> Another thing you could try is to make sure that GenericRecord is seen
>> as an avro type by fink (flink should understand that avro type is a
>> complex type):
>>
>>       dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)
>>
>> than the TableEnvironment should pick it up as a structured type and
>> flatten it automatically when registering the Table. Bear in mind the
>> returns method is part of SingleInputUdfOperator so you can apply it
>> right after some transformation e.g. map/flatMap etc.
>>
>> Best,
>>
>> Dawid
>>
>> On 06/01/2020 18:03, Hailu, Andreas wrote:
>>
>>      Hi David, thanks for getting back.
>>
>>       From what you've said, I think we'll need to convert our
>>      GenericRecord into structured types - do you have any references or
>>      examples I can have a look at? If not, perhaps you could just show
>>      me a basic example of flattening a complex object with accessors
>>      into a Table of structured types. Or by structured types, did you
>>      mean Row?
>>
>>      *// *ah
>>
>>      *From:* Dawid Wysakowicz <mailto:[hidden email]>
>>      <mailto:[hidden email]>
>>      *Sent:* Monday, January 6, 2020 9:32 AM
>>      *To:* Hailu, Andreas [Engineering] <mailto:[hidden email]>
>>      <mailto:[hidden email]>; mailto:[hidden email]
>>      <mailto:[hidden email]>
>>      *Cc:* Richards, Adam S [Engineering] <mailto:[hidden email]>
>>      <mailto:[hidden email]>
>>      *Subject:* Re: Table API: Joining on Tables of Complex Types
>>
>>      Hi Andreas,
>>
>>      First of all I would highly recommend converting a non-structured
>>      types to structured types as soon as possible as it opens more
>>      possibilities to optimize the plan.
>>
>>      Have you tried:
>>
>>      Table users =
>>      batchTableEnvironment.fromDataSet(usersDataset).select("getField(f0,
>>      userName) as userName", "f0")
>>      Table other =
>>      batchTableEnvironment.fromDataSet(otherDataset).select("getField(f0,
>>      userName) as user", "f1")
>>
>>      Table result = other.join(users, "user = userName")
>>
>>      You could also check how the
>>      org.apache.flink.formats.avro.AvroRowDeserializationSchema class is
>>      implemented which internally converts an avro record to a structured
>>      Row.
>>
>>      Hope this helps.
>>
>>      Best,
>>
>>      Dawid
>>
>>      On 03/01/2020 23:16, Hailu, Andreas wrote:
>>
>>          Hi folks,
>>
>>          I'm trying to join two Tables which are composed of complex
>>          types, Avro's GenericRecord to be exact. I have to use a custom
>>          UDF to extract fields out of the record and I'm having some
>>          trouble on how to do joins on them as I need to call this UDF to
>>          read what I need. Example below:
>>
>>          batchTableEnvironment.registerFunction("getField", new
>>          GRFieldExtractor()); // GenericRecord field extractor
>>
>>          Table users = batchTableEnvironment.fromDataSet(usersDataset);
>>          // Converting from some pre-existing DataSet
>>
>>          Table otherDataset =
>>          batchTableEnvironment.fromDataSet(someOtherDataset);
>>
>>          Table userNames = t.select("getField(f0, userName)"); // This is
>>          how the UDF is used, as GenericRecord is a complex type
>>          requiring you to invoke a get() method on the field you're
>>          interested in. Here we get a get on field 'userName'
>>
>>          I'd like to do something using the Table API similar to the
>>          query "SELECT * from otherDataset WHERE otherDataset.userName =
>>          users.userName". How is this done?
>>
>>          Best,
>>
>>          Andreas
>>
>>          *The Goldman Sachs Group, Inc. All rights reserved*.
>>
>>          See http://www.gs.com/disclaimer/global_email for important risk
>>          disclosures, conflicts of interest and other terms and
>>          conditions relating to this e-mail and your reliance on
>>          information contained in it.  This message may contain
>>          confidential or privileged information.  If you are not the
>>          intended recipient, please advise us immediately and delete this
>>          message.  See http://www.gs.com/disclaimer/email for further
>>          information on confidentiality and the risks of non-secure
>>          electronic communication.  If you cannot access these links,
>>          please notify us by reply message and we will send the contents
>>          to you.
>>
>>
>> ----------------------------------------------------------------------
>> --
>>
>>
>>          Your Personal Data: We may collect and process information about
>>          you that may be subject to data protection laws. For more
>>          information about how we use and disclose your personal data,
>>          how we protect your information, our legal basis to use your
>>          information, your rights and who you can contact, please refer
>>          to: http://www.gs.com/privacy-notices
>> <http://www.gs.com/privacy-notices>
>>
>>
>> ----------------------------------------------------------------------
>> --
>>
>>
>>      Your Personal Data: We may collect and process information about you
>>      that may be subject to data protection laws. For more information
>>      about how we use and disclose your personal data, how we protect
>>      your information, our legal basis to use your information, your
>>      rights and who you can contact, please refer to:
>>      http://www.gs.com/privacy-notices <http://www.gs.com/privacy-notices>
>>
>>
>> ----------------------------------------------------------------------
>> --
>>
>> Your Personal Data: We may collect and process information about you
>> that may be subject to data protection laws. For more information
>> about how we use and disclose your personal data, how we protect your
>> information, our legal basis to use your information, your rights and
>> who you can contact, please refer to: http://www.gs.com/privacy-notices
>> <http://www.gs.com/privacy-notices>
>
>
> ________________________________
>
> Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
>