Hi all,
I currently find myself evaluating a use case, where I have to deal with wide (i.e. about 50-60 columns, definitely more than the 25 supported by the Tuple types), structured data from CSV files, with a potentially dynamically (during runtime) generated (or automatically inferred from the CSV file) schema. SparkSQL works very well for this case, because I can generate or infer the schema dynamically at runtime, access fields in UDFs via index or name (via the Row API), generate new schemata for UDF results on the fly, and use those schemata to read and write from/to CSV. Obviously Spark and SparkSQL have other quirks and I'd like to find a good solution to do this with Flink. The main limitation seems to be that I can't seem to have DataSets of arbitrary-length, arbitrary-type (i.e. unknown during compile time), tuples. The Record API/type looks like it was meant to provide something like that but it appears to become deprecated and is not well supported by the DataSet APIs (e.g. I can't do a join on Records by field index, nor does the CsvReader API support Records), and it has no concept of field names, either. I though about generating Java classes of my schemata on runtime (e.g. via Javassist), but that seems like a hack, and I'd probably have to do this for each intermediate schema as well (e.g. when a map operation alters the schema). I haven't tried this avenue yet, so I'm not certain it would actually work, and even less certain that this is a nice and maintainable solution Can anyone suggest a nice way to deal with this kind of use case? I can prepare an example if that would make it more clear. Thanks, Johann |
Hi Johann, I see three options for your use case.3) A hybrid approach of both, which works without code generation. Use a generic holder, e.g., Object[] for your data records but implement you own type information, serializers and comparators. After each operation, you can define the type information of the result using the returns() method, e.g.; myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo). This approach requires a good understanding of Flink's type system, but if done correctly, you can also use expressions or positions to define keys and benefit from efficient serialization and binary comparisons. However, similar to the first approach, you need to know the schema of the data in advance (before the program is executed). Let me know if you have any questions. 2015-10-28 20:01 GMT+01:00 Johann Kovacs <[hidden email]>: Hi all, |
Hi Johann! You can try and use the Table API, it has logical tuples that you program with, rather than tuple classes. Have a look here: https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html Stephan On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <[hidden email]> wrote:
|
Hi, thanks once again for those pointers. I did a bit of experimenting the past couple of days and came to the following conclusions: 1. Unfortunately, I don't think I can get away with option 1 (generating POJOs on runtime). At least not without generating lots of boiler plate code, because I'd like to be able to access fields, also inside UDFs, by index or name. Also instantiating generated classes in UDFs seems like a pain. 2. Table API looks nice, but doesn't seem to solve the problem of getting the data into a DataSet in the first place. And I'll likely have to revert to the DataSet API to do low level operations, such as a simple map(), as far as I can tell. However, it seems like the table api already provides exactly Fabian's option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if I'm not mistaken, am I? I played around a bit, trying to use a DataSet[Row] and it seems to work great. I had to add a hack to make the ScalaCsvInputFormat read Rows instead of Tuples, as well as add a few convenience methods to the Row class and a Schema API to quickly generate RowTypeInfos. I pushed my experiments here: https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df A toy example of how I'd like to use the APIs is included in the TableExperiments.scala file. Is this a maintainable solution, or is the Row api supposed to be internal only? If it's supposed to be used outside of the Table api, do you think we can add the Row and Schema convenience layer like I started to implement to the core flink-table? Would make it much easier to work with with the regular DataSet API. Thanks, Johann On 30 October 2015 at 03:34, Stephan Ewen <[hidden email]> wrote:
|
Hi,
these are some interesting Ideas. I have some thoughts, though, about the current implementation. 1. With Schema and Field you are basically re-implementing RowTypeInfo, so it should not be required. Maybe just an easier way to create a RowTypeInfo. 2. Right now, in Flink the TypeInformation is technically not meant to be Serializable and be shipped to runtime operations. (Although practically it is in places, there is an ongoing discussion about this.) 3. Having the Schema as an implicit parameter does not work in the general case because the compiler does not know which Schema to take if there are several Schemas around. Maybe the Row would have to be extended to contain the Schema. But this could have performance implications. We should definitely add support for creating a DataSet[Row] directly from the a CSV-Input, since otherwise you have to go trough tuples which does not work with dynamic schemas and if you have more than a certain amount of fields. Cheers, Aljoscha > On 02 Nov 2015, at 17:41, Johann Kovacs <[hidden email]> wrote: > > Hi, > thanks once again for those pointers. I did a bit of experimenting the past couple of days and came to the following conclusions: > 1. Unfortunately, I don't think I can get away with option 1 (generating POJOs on runtime). At least not without generating lots of boiler plate code, because I'd like to be able to access fields, also inside UDFs, by index or name. Also instantiating generated classes in UDFs seems like a pain. > 2. Table API looks nice, but doesn't seem to solve the problem of getting the data into a DataSet in the first place. And I'll likely have to revert to the DataSet API to do low level operations, such as a simple map(), as far as I can tell. > > However, it seems like the table api already provides exactly Fabian's option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if I'm not mistaken, am I? > I played around a bit, trying to use a DataSet[Row] and it seems to work great. I had to add a hack to make the ScalaCsvInputFormat read Rows instead of Tuples, as well as add a few convenience methods to the Row class and a Schema API to quickly generate RowTypeInfos. I pushed my experiments here: https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df > A toy example of how I'd like to use the APIs is included in the TableExperiments.scala file. > > Is this a maintainable solution, or is the Row api supposed to be internal only? If it's supposed to be used outside of the Table api, do you think we can add the Row and Schema convenience layer like I started to implement to the core flink-table? Would make it much easier to work with with the regular DataSet API. > > Thanks, > Johann > > On 30 October 2015 at 03:34, Stephan Ewen <[hidden email]> wrote: > Hi Johann! > > You can try and use the Table API, it has logical tuples that you program with, rather than tuple classes. > > Have a look here: https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html > > Stephan > > > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <[hidden email]> wrote: > Hi Johann, > > I see three options for your use case. > > 1) Generate Pojo code at planning time, i.e., when the program is composed. This does not work when the program is already running. The benefit is that you can use key expressions, have typed fields, and type specific serializers and comparators. > > 2) Use Record, an Object[], or List<Object> (or some own holder with a few convenience methods) to store the data untyped and work with KeySelectors to extract grouping and join keys. The drawbacks of this approach are generic serializers (Kryo) which won't as efficient as the native ones. If you know the key types and don't use generic types for keys, sorting and joining should be still fast. > > 3) A hybrid approach of both, which works without code generation. Use a generic holder, e.g., Object[] for your data records but implement you own type information, serializers and comparators. After each operation, you can define the type information of the result using the returns() method, e.g.; > myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo). This approach requires a good understanding of Flink's type system, but if done correctly, you can also use expressions or positions to define keys and benefit from efficient serialization and binary comparisons. However, similar to the first approach, you need to know the schema of the data in advance (before the program is executed). > > In my opinion the first approach is the better, but as you said it is more effort to implement and might not work depending on what information is available at which point in time. > > Let me know if you have any questions. > > Cheers, Fabian > > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <[hidden email]>: > Hi all, > > I currently find myself evaluating a use case, where I have to deal > with wide (i.e. about 50-60 columns, definitely more than the 25 > supported by the Tuple types), structured data from CSV files, with a > potentially dynamically (during runtime) generated (or automatically > inferred from the CSV file) schema. > SparkSQL works very well for this case, because I can generate or > infer the schema dynamically at runtime, access fields in UDFs via > index or name (via the Row API), generate new schemata for UDF results > on the fly, and use those schemata to read and write from/to CSV. > Obviously Spark and SparkSQL have other quirks and I'd like to find a > good solution to do this with Flink. > > The main limitation seems to be that I can't seem to have DataSets of > arbitrary-length, arbitrary-type (i.e. unknown during compile time), > tuples. The Record API/type looks like it was meant to provide > something like that but it appears to become deprecated and is not > well supported by the DataSet APIs (e.g. I can't do a join on Records > by field index, nor does the CsvReader API support Records), and it > has no concept of field names, either. > > I though about generating Java classes of my schemata on runtime (e.g. > via Javassist), but that seems like a hack, and I'd probably have to > do this for each intermediate schema as well (e.g. when a map > operation alters the schema). I haven't tried this avenue yet, so I'm > not certain it would actually work, and even less certain that this is > a nice and maintainable solution > > Can anyone suggest a nice way to deal with this kind of use case? I > can prepare an example if that would make it more clear. > > Thanks, > Johann > > > |
Hi, thanks for having a look at this, Aljoscha. Not being able to read a DataSet[Row] from csv is definitively the most major issue for me right now. Everything else I could work around with Scala magic. I can create an issue for this if you'd like. Regarding the other points: 1. Oh absolutely, that's really the most important reason I implemented those classes in the first place, although I also liked being able to deserialize the schema from json/yaml, before converting it to a RowTypeInfo. 2. I have no strong opinion either way regarding serializability of TypeInformations. But having the field name <-> index mapping available in the UDF would be very very nice to have, so that I can access fields by name instead of by index. If you decide to make TypeInformation non-serializable, maybe having a dedicated Schema class for this purpose isn't such a bad idea after all. 3. Fair enough, I guess I was too excited with Scala implicit magic when I wrote the prototype :-) I wouldn't want to include the Schema with each Row either. I guess non-implicit utility functions for that would work just as well, or the user can work around this by themselves if the field name <-> index mapping is available in the UDF (see point 2) Thanks, Johann On 5 November 2015 at 13:59, Aljoscha Krettek <[hidden email]> wrote: Hi, |
Hi,
yes please, open an Issue for that. I think the method would have to be added to TableEnvironment. Aljoscha > On 09 Nov 2015, at 12:19, Johann Kovacs <[hidden email]> wrote: > > Hi, > thanks for having a look at this, Aljoscha. > > Not being able to read a DataSet[Row] from csv is definitively the most major issue for me right now. > Everything else I could work around with Scala magic. I can create an issue for this if you'd like. > > Regarding the other points: > 1. Oh absolutely, that's really the most important reason I implemented those classes in the first place, > although I also liked being able to deserialize the schema from json/yaml, before converting it to a RowTypeInfo. > 2. I have no strong opinion either way regarding serializability of TypeInformations. But having the field name <-> index mapping available > in the UDF would be very very nice to have, so that I can access fields by name instead of by index. > If you decide to make TypeInformation non-serializable, maybe having a dedicated Schema class for this purpose isn't such a bad idea after all. > 3. Fair enough, I guess I was too excited with Scala implicit magic when I wrote the prototype :-) > I wouldn't want to include the Schema with each Row either. I guess non-implicit utility functions for that would work just as well, or the user can > work around this by themselves if the field name <-> index mapping is available in the UDF (see point 2) > > Thanks, > Johann > > > On 5 November 2015 at 13:59, Aljoscha Krettek <[hidden email]> wrote: > Hi, > these are some interesting Ideas. > > I have some thoughts, though, about the current implementation. > 1. With Schema and Field you are basically re-implementing RowTypeInfo, so it should not be required. Maybe just an easier way to create a RowTypeInfo. > 2. Right now, in Flink the TypeInformation is technically not meant to be Serializable and be shipped to runtime operations. (Although practically it is in places, there is an ongoing discussion about this.) > 3. Having the Schema as an implicit parameter does not work in the general case because the compiler does not know which Schema to take if there are several Schemas around. Maybe the Row would have to be extended to contain the Schema. But this could have performance implications. > > We should definitely add support for creating a DataSet[Row] directly from the a CSV-Input, since otherwise you have to go trough tuples which does not work > with dynamic schemas and if you have more than a certain amount of fields. > > Cheers, > Aljoscha > > On 02 Nov 2015, at 17:41, Johann Kovacs <[hidden email]> wrote: > > > > Hi, > > thanks once again for those pointers. I did a bit of experimenting the past couple of days and came to the following conclusions: > > 1. Unfortunately, I don't think I can get away with option 1 (generating POJOs on runtime). At least not without generating lots of boiler plate code, because I'd like to be able to access fields, also inside UDFs, by index or name. Also instantiating generated classes in UDFs seems like a pain. > > 2. Table API looks nice, but doesn't seem to solve the problem of getting the data into a DataSet in the first place. And I'll likely have to revert to the DataSet API to do low level operations, such as a simple map(), as far as I can tell. > > > > However, it seems like the table api already provides exactly Fabian's option 3 with its Row, RowTypeInfo, and RowSerializer implementations, if I'm not mistaken, am I? > > I played around a bit, trying to use a DataSet[Row] and it seems to work great. I had to add a hack to make the ScalaCsvInputFormat read Rows instead of Tuples, as well as add a few convenience methods to the Row class and a Schema API to quickly generate RowTypeInfos. I pushed my experiments here: https://github.com/jkovacs/flink/commit/2cee8502968f8815cd3a5f9d2994d3e5355e96df > > A toy example of how I'd like to use the APIs is included in the TableExperiments.scala file. > > > > Is this a maintainable solution, or is the Row api supposed to be internal only? If it's supposed to be used outside of the Table api, do you think we can add the Row and Schema convenience layer like I started to implement to the core flink-table? Would make it much easier to work with with the regular DataSet API. > > > > Thanks, > > Johann > > > > On 30 October 2015 at 03:34, Stephan Ewen <[hidden email]> wrote: > > Hi Johann! > > > > You can try and use the Table API, it has logical tuples that you program with, rather than tuple classes. > > > > Have a look here: https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html > > > > Stephan > > > > > > On Thu, Oct 29, 2015 at 6:53 AM, Fabian Hueske <[hidden email]> wrote: > > Hi Johann, > > > > I see three options for your use case. > > > > 1) Generate Pojo code at planning time, i.e., when the program is composed. This does not work when the program is already running. The benefit is that you can use key expressions, have typed fields, and type specific serializers and comparators. > > > > 2) Use Record, an Object[], or List<Object> (or some own holder with a few convenience methods) to store the data untyped and work with KeySelectors to extract grouping and join keys. The drawbacks of this approach are generic serializers (Kryo) which won't as efficient as the native ones. If you know the key types and don't use generic types for keys, sorting and joining should be still fast. > > > > 3) A hybrid approach of both, which works without code generation. Use a generic holder, e.g., Object[] for your data records but implement you own type information, serializers and comparators. After each operation, you can define the type information of the result using the returns() method, e.g.; > > myData.map(new MapFunction<Object[], Object[]>).returns(myCustomTypeInfo). This approach requires a good understanding of Flink's type system, but if done correctly, you can also use expressions or positions to define keys and benefit from efficient serialization and binary comparisons. However, similar to the first approach, you need to know the schema of the data in advance (before the program is executed). > > > > In my opinion the first approach is the better, but as you said it is more effort to implement and might not work depending on what information is available at which point in time. > > > > Let me know if you have any questions. > > > > Cheers, Fabian > > > > 2015-10-28 20:01 GMT+01:00 Johann Kovacs <[hidden email]>: > > Hi all, > > > > I currently find myself evaluating a use case, where I have to deal > > with wide (i.e. about 50-60 columns, definitely more than the 25 > > supported by the Tuple types), structured data from CSV files, with a > > potentially dynamically (during runtime) generated (or automatically > > inferred from the CSV file) schema. > > SparkSQL works very well for this case, because I can generate or > > infer the schema dynamically at runtime, access fields in UDFs via > > index or name (via the Row API), generate new schemata for UDF results > > on the fly, and use those schemata to read and write from/to CSV. > > Obviously Spark and SparkSQL have other quirks and I'd like to find a > > good solution to do this with Flink. > > > > The main limitation seems to be that I can't seem to have DataSets of > > arbitrary-length, arbitrary-type (i.e. unknown during compile time), > > tuples. The Record API/type looks like it was meant to provide > > something like that but it appears to become deprecated and is not > > well supported by the DataSet APIs (e.g. I can't do a join on Records > > by field index, nor does the CsvReader API support Records), and it > > has no concept of field names, either. > > > > I though about generating Java classes of my schemata on runtime (e.g. > > via Javassist), but that seems like a hack, and I'd probably have to > > do this for each intermediate schema as well (e.g. when a map > > operation alters the schema). I haven't tried this avenue yet, so I'm > > not certain it would actually work, and even less certain that this is > > a nice and maintainable solution > > > > Can anyone suggest a nice way to deal with this kind of use case? I > > can prepare an example if that would make it more clear. > > > > Thanks, > > Johann > > > > > > > |
Free forum by Nabble | Edit this page |