Hi All!
I was trying to flatten a nested tuple into named columns with the fromDataStream method and I hit some problems with mapping tuple fields to column names. It seems like the `f0 as ColumnName` kind of expressions are not parsed correctly. It is very easy to reproduce: tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), "f0 as name, f1 as age"); This leads to the following 2 kinds of errors depending on how you write it: - Alias 'name' is not allowed if other fields are referenced by position. - Could not parse expression at column 7: `(' expected but `'' found f0 as 'name', f1 as 'age'I could not find any test cases that would use this logic so I wonder if I am doing something wrong here, the docs show that this should be possible: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#tuples-scala-and-java-and-case-classes-scala-only I was actually trying to extract nested tuple fields this way but I did not get that far. It also seems to fail for Row data types. What am I doing wrong? Gyula |
Hi, gyula.fora
If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet, because all fields are referenced by position in this point. You can simply alias like following syntax: ``` tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age"); ``` This should satisfy your purpose. And back to the 1.10 docs, If you are converting Table from a POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported because this point all fields are referenced by name, like: ``` tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,"); ``` Best, Leonard, Xu |
Hi Leonard, The tuple fields can also be referenced as their POJO names (f0, f1), they can be reordered similar to pojo fields, however you cannot alias them. (If you look at the link I have sent that shows how it is supposed to work but it throws an exception when I try it) Also what I am trying to do at the end is to flatten a nested tuple: Tuple2<String, Tuple2<Integer, Integer>> -> into 3 columns, lets say name, age, height Normally I would write this: tableEnv.fromDataStream(input, “f0 as name, f1.f0 as age, f1.f1 as height"); However this doesnt work and there seem to be no way to assign names to the nested tuple columns anyways. For Pojo aliasing works but still I cannot find a way to unnest a nested object: public static class Person { public String name; public public Tuple2<Integer, Integer> details; } tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as height") this leads to an error: Field reference expression or alias on field expression expected. Aliasing fields also doesn't work when converting from Row stream even if the column names are provided in the type info. Cheers, Gyula On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu <[hidden email]> wrote: Hi, gyula.fora |
Hi Gyula, I think you are hitting a bug with the naming/aliasing of the fields of a Tuple. The bug is in the org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition method. As it does not work correctly for aliases. Would you mind creating an issue for it? You should be able to alias the fields as follows: tableEnv.fromDataStream(input, “name, age, height"); Unfortunately you can not reorder the fields that way. If you want to flatten/extract nested fields you should be able
to do that in a subsequent operation. The method fromDataStream is
supposed to register the entire DataStream as a Table and it does
not support projections etc. tableEnv.fromDataStream(input, “name, age, height") .select("name.f0 as nameF0, age.flatten, ..."); Side note. In my opinion this method (fromDataStream(DataStream,
Expression/String... fields)) has already too many
responsibilities and is hard to understand. (You can reorder
fields, rename fields without alias, rename fields with an alias,
alias works differently depending of the available fields or type
etc.). In the long term I'd prefer to come up with a better way of
creating a Table out of a DataStream. BTW The way we can fix the renaming + reordering is by changing the method I mentioned: public static boolean
isReferenceByPosition(CompositeType<?> ct, Expression[]
fields) {
Best, Dawid
On 27/04/2020 15:57, Gyula Fóra wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid, Thanks for the clarification on this issue and I agree that there is too much going on with these conversions already. What do you mean by "Unfortunately you can not reorder the fields that way." ? I can reorder POJO fields even after aliasing and also tuple fields (f1, f0) so I assume reordering will still work if tuple and row aliasing is fixed. I will open a JIRA for this! Thanks! Gyula On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz <[hidden email]> wrote:
|
What I meant by "Unfortunately you can not reorder the fields
that way." is that tableEnv.fromDataStream(input, “name, age, height"); uses the so-called referenceByPosition mode. It
will name the f0 field -> name, the f1 -> age and f2 ->
height.
If it wasn't for the bug you could reorder and rename at the same time: tableEnv.fromDataStream(input, “f1 as name, f2 as age, f0 as
height") // it reorders the fields of the pojo to the order
f1,f2,f0 and give them aliases With a fix it should be possible yes. Best, Dawid
On 27/04/2020 17:24, Gyula Fóra wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |