Cannot map nested Tuple fields to table columns

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot map nested Tuple fields to table columns

Gyula Fóra
Hi All!

I was trying to flatten a nested tuple into named columns with the fromDataStream method and I hit some problems with mapping tuple fields to column names.

It seems like the `f0 as ColumnName` kind of expressions are not parsed correctly.

It is very easy to reproduce:
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), "f0 as name, f1 as age");

This leads to the following 2 kinds of errors depending on how you write it: 
 - Alias 'name' is not allowed if other fields are referenced by position.
 - Could not parse expression at column 7: `(' expected but `'' found
f0 as 'name', f1 as 'age'

I could not find any test cases that would use this logic so I wonder if I am doing something wrong here, the docs show that this should be possible: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#tuples-scala-and-java-and-case-classes-scala-only

I was actually trying to extract nested tuple fields this way but I did not get that far. It also seems to fail for Row data types.

What am I doing wrong?

Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Cannot map nested Tuple fields to table columns

Leonard Xu
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet,
because all fields are referenced by position in this point. You can simply alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are converting Table from a
POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,");
```


Best,
Leonard, Xu
Reply | Threaded
Open this post in threaded view
|

Re: Cannot map nested Tuple fields to table columns

Gyula Fóra
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they can be reordered similar to pojo fields, however you cannot alias them. (If you look at the link I have sent that shows how it is supposed to work but it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2<String, Tuple2<Integer, Integer>> -> into 3 columns, lets say name, age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name, f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested object:

public static class Person {
  public String name;
  public public Tuple2<Integer, Integer> details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as height")

this leads to an error: 
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu <[hidden email]> wrote:
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet,
because all fields are referenced by position in this point. You can simply alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are converting Table from a
POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,");
```


Best,
Leonard, Xu
Reply | Threaded
Open this post in threaded view
|

Re: Cannot map nested Tuple fields to table columns

Dawid Wysakowicz-2

Hi Gyula,

I think you are hitting a bug with the naming/aliasing of the fields of a Tuple. The bug is in the org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition method. As it does not work correctly for aliases. Would you mind creating an issue for it?

You should be able to alias the fields as follows:

tableEnv.fromDataStream(input, “name, age, height");

Unfortunately you can not reorder the fields that way.

If you want to flatten/extract nested fields you should be able to do that in a subsequent operation. The method fromDataStream is supposed to register the entire DataStream as a Table and it does not support projections etc.

tableEnv.fromDataStream(input, “name, age, height")

.select("name.f0 as nameF0, age.flatten, ...");

Side note. In my opinion this method (fromDataStream(DataStream, Expression/String... fields)) has already too many responsibilities and is hard to understand. (You can reorder fields, rename fields without alias, rename fields with an alias, alias works differently depending of the available fields or type etc.). In the long term I'd prefer to come up with a better way of creating a Table out of a DataStream.

BTW The way we can fix the renaming + reordering is by changing the method I mentioned:

    public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
        if (!(ct instanceof TupleTypeInfoBase)) {
            return false;
        }

        List<String> inputNames = Arrays.asList(ct.getFieldNames());

        // Use the by-position mode if no of the fields exists in the input.
        // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
        // by position but the user might assume reordering instead of renaming.
        return Arrays.stream(fields).allMatch(f -> {
            if (f instanceof UnresolvedCallExpression &&
                    ((UnresolvedCallExpression) f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&
                    f.getChildren().get(0) instanceof UnresolvedReferenceExpression) {
                return false;
            }

            if (f instanceof UnresolvedReferenceExpression) {
                return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
            }

            return true;
        });
    }


Best,

Dawid


On 27/04/2020 15:57, Gyula Fóra wrote:
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they can be reordered similar to pojo fields, however you cannot alias them. (If you look at the link I have sent that shows how it is supposed to work but it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2<String, Tuple2<Integer, Integer>> -> into 3 columns, lets say name, age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name, f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested object:

public static class Person {
  public String name;
  public public Tuple2<Integer, Integer> details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as height")

this leads to an error: 
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu <[hidden email]> wrote:
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet,
because all fields are referenced by position in this point. You can simply alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are converting Table from a
POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,");
```


Best,
Leonard, Xu

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Cannot map nested Tuple fields to table columns

Gyula Fóra
Hi Dawid,

Thanks for the clarification on this issue and I agree that there is too much going on with these conversions already.

What do you mean by "Unfortunately you can not reorder the fields that way." ?
I can reorder POJO fields even after aliasing and also tuple fields (f1, f0) so I assume reordering will still work if tuple and row aliasing is fixed.

I will open a JIRA for this!

Thanks!
Gyula

On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I think you are hitting a bug with the naming/aliasing of the fields of a Tuple. The bug is in the org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition method. As it does not work correctly for aliases. Would you mind creating an issue for it?

You should be able to alias the fields as follows:

tableEnv.fromDataStream(input, “name, age, height");

Unfortunately you can not reorder the fields that way.

If you want to flatten/extract nested fields you should be able to do that in a subsequent operation. The method fromDataStream is supposed to register the entire DataStream as a Table and it does not support projections etc.

tableEnv.fromDataStream(input, “name, age, height")

.select("name.f0 as nameF0, age.flatten, ...");

Side note. In my opinion this method (fromDataStream(DataStream, Expression/String... fields)) has already too many responsibilities and is hard to understand. (You can reorder fields, rename fields without alias, rename fields with an alias, alias works differently depending of the available fields or type etc.). In the long term I'd prefer to come up with a better way of creating a Table out of a DataStream.

BTW The way we can fix the renaming + reordering is by changing the method I mentioned:

    public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
        if (!(ct instanceof TupleTypeInfoBase)) {
            return false;
        }

        List<String> inputNames = Arrays.asList(ct.getFieldNames());

        // Use the by-position mode if no of the fields exists in the input.
        // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
        // by position but the user might assume reordering instead of renaming.
        return Arrays.stream(fields).allMatch(f -> {
            if (f instanceof UnresolvedCallExpression &&
                    ((UnresolvedCallExpression) f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&
                    f.getChildren().get(0) instanceof UnresolvedReferenceExpression) {
                return false;
            }

            if (f instanceof UnresolvedReferenceExpression) {
                return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
            }

            return true;
        });
    }


Best,

Dawid


On 27/04/2020 15:57, Gyula Fóra wrote:
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they can be reordered similar to pojo fields, however you cannot alias them. (If you look at the link I have sent that shows how it is supposed to work but it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2<String, Tuple2<Integer, Integer>> -> into 3 columns, lets say name, age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name, f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested object:

public static class Person {
  public String name;
  public public Tuple2<Integer, Integer> details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as height")

this leads to an error: 
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu <[hidden email]> wrote:
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet,
because all fields are referenced by position in this point. You can simply alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are converting Table from a
POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,");
```


Best,
Leonard, Xu
Reply | Threaded
Open this post in threaded view
|

Re: Cannot map nested Tuple fields to table columns

Dawid Wysakowicz-2

What I meant by "Unfortunately you can not reorder the fields that way." is that

   tableEnv.fromDataStream(input, “name, age, height");

uses the so-called referenceByPosition mode. It will name the f0 field -> name, the f1 -> age and f2 -> height.


If it wasn't for the bug you could reorder and rename at the same time:

   tableEnv.fromDataStream(input, “f1 as name, f2 as age, f0 as height") // it reorders the fields of the pojo to the order f1,f2,f0 and give them aliases

With a fix it should be possible yes.

Best,

Dawid


On 27/04/2020 17:24, Gyula Fóra wrote:
Hi Dawid,

Thanks for the clarification on this issue and I agree that there is too much going on with these conversions already.

What do you mean by "Unfortunately you can not reorder the fields that way." ?
I can reorder POJO fields even after aliasing and also tuple fields (f1, f0) so I assume reordering will still work if tuple and row aliasing is fixed.

I will open a JIRA for this!

Thanks!
Gyula

On Mon, Apr 27, 2020 at 4:58 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Gyula,

I think you are hitting a bug with the naming/aliasing of the fields of a Tuple. The bug is in the org.apache.flink.table.typeutils.FieldInfoUtils#isReferenceByPosition method. As it does not work correctly for aliases. Would you mind creating an issue for it?

You should be able to alias the fields as follows:

tableEnv.fromDataStream(input, “name, age, height");

Unfortunately you can not reorder the fields that way.

If you want to flatten/extract nested fields you should be able to do that in a subsequent operation. The method fromDataStream is supposed to register the entire DataStream as a Table and it does not support projections etc.

tableEnv.fromDataStream(input, “name, age, height")

.select("name.f0 as nameF0, age.flatten, ...");

Side note. In my opinion this method (fromDataStream(DataStream, Expression/String... fields)) has already too many responsibilities and is hard to understand. (You can reorder fields, rename fields without alias, rename fields with an alias, alias works differently depending of the available fields or type etc.). In the long term I'd prefer to come up with a better way of creating a Table out of a DataStream.

BTW The way we can fix the renaming + reordering is by changing the method I mentioned:

    public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) {
        if (!(ct instanceof TupleTypeInfoBase)) {
            return false;
        }

        List<String> inputNames = Arrays.asList(ct.getFieldNames());

        // Use the by-position mode if no of the fields exists in the input.
        // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
        // by position but the user might assume reordering instead of renaming.
        return Arrays.stream(fields).allMatch(f -> {
            if (f instanceof UnresolvedCallExpression &&
                    ((UnresolvedCallExpression) f).getFunctionDefinition() == BuiltInFunctionDefinitions.AS &&
                    f.getChildren().get(0) instanceof UnresolvedReferenceExpression) {
                return false;
            }

            if (f instanceof UnresolvedReferenceExpression) {
                return !inputNames.contains(((UnresolvedReferenceExpression) f).getName());
            }

            return true;
        });
    }


Best,

Dawid


On 27/04/2020 15:57, Gyula Fóra wrote:
Hi Leonard,

The tuple fields can also be referenced as their POJO names (f0, f1), they can be reordered similar to pojo fields, however you cannot alias them. (If you look at the link I have sent that shows how it is supposed to work but it throws an exception when I try it)
Also what I am trying to do at the end is to flatten a nested tuple:

Tuple2<String, Tuple2<Integer, Integer>> -> into 3 columns, lets say name, age, height

Normally I would write this: tableEnv.fromDataStream(input, “f0 as name, f1.f0 as age, f1.f1 as height");
However this doesnt work and there seem to be no way to assign names to the nested tuple columns anyways.

For Pojo aliasing works  but still I cannot find a way to unnest a nested object:

public static class Person {
  public String name;
  public public Tuple2<Integer, Integer> details;
}

tableEnv.fromDataStream(persons, "name, details.f0 as age, details.f1 as height")

this leads to an error: 
Field reference expression or alias on field expression expected.

Aliasing fields also doesn't work when converting from Row stream even if the column names are provided in the type info.

Cheers,
Gyula

On Mon, Apr 27, 2020 at 3:33 PM Leonard Xu <[hidden email]> wrote:
Hi,  gyula.fora

If you’re trying convert Table from a Tuple DataStream, Alias the filed by `as` expression is no supported yet,
because all fields are referenced by position in this point. You can simply alias like following syntax:
```
tableEnv.fromDataStream(env.fromElements(Tuple2.of("a", 1)), “name, age");
```
This should satisfy  your purpose. And back to the 1.10 docs, If you are converting Table from a
POJO(assuming the POJO person has two fields name and age) DataStream, Alias the filed by `as` is supported
because this point all fields are referenced by name, like:
```
tableEnv.fromDataStream(env.fromElements(new Person(“foo", 12)), “age as age_alias, name as user_name,");
```


Best,
Leonard, Xu

signature.asc (849 bytes) Download Attachment