Access Row fields by attribute name rather than by index in PyFlink TableFunction

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Access Row fields by attribute name rather than by index in PyFlink TableFunction

Sumeet Malhotra
Hi,

According to the documentation for PyFlink Table row based operations [1], typical usage is as follows:

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
    for s in x[1].split(","):
        yield x[0], s

table.flat_map(split)

Is there any way that row fields inside the UDTF can be accessed by their attribute names instead of array index? In my use case, I'm doing the following:

raw_data = t_env.from_path('MySource')
raw_data \
    .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
    .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
    .execute_insert("MySink")

In the table function `my_flat_map_fn` I'm unable to access the fields of the row by their attribute names i.e., assuming the input argument to the table function is x, I cannot access fields as x.a, x.b or x.c, instead I have use use x[0], x[1] and x[2]. The error I get is the _fields is not populated.

In my use case, the number of columns is very high and working with indexes is so much error prone and unmaintainable.

Any suggestions?

Thanks,
Sumeet

Reply | Threaded
Open this post in threaded view
|

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

Xingbo Huang
Hi Sumeet,

Due to the limitation of the original PyFlink serializers design, there is no way to pass attribute names to Row in row-based operations. In release-1.14, I am reconstructing the implementations of serializers[1]. After completion, accessing attribute names of `Row` in row-based operations will be supported[2].

About the work around way in releases-1.13, maybe you need to manually set the field_names of Row. e.g.
```
def my_table_tranform_fn(x: Row):
    x.set_field_names(['a', 'b', 'c'])
    ...
```

[1] https://issues.apache.org/jira/browse/FLINK-22612
[2] https://issues.apache.org/jira/browse/FLINK-22712

Best,
Xingbo

Sumeet Malhotra <[hidden email]> 于2021年5月19日周三 下午4:45写道:
Hi,

According to the documentation for PyFlink Table row based operations [1], typical usage is as follows:

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
    for s in x[1].split(","):
        yield x[0], s

table.flat_map(split)

Is there any way that row fields inside the UDTF can be accessed by their attribute names instead of array index? In my use case, I'm doing the following:

raw_data = t_env.from_path('MySource')
raw_data \
    .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
    .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
    .execute_insert("MySink")

In the table function `my_flat_map_fn` I'm unable to access the fields of the row by their attribute names i.e., assuming the input argument to the table function is x, I cannot access fields as x.a, x.b or x.c, instead I have use use x[0], x[1] and x[2]. The error I get is the _fields is not populated.

In my use case, the number of columns is very high and working with indexes is so much error prone and unmaintainable.

Any suggestions?

Thanks,
Sumeet

Reply | Threaded
Open this post in threaded view
|

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

Sumeet Malhotra
Thanks Xingbo! The workaround will probably work for now, at least it avoids having to refer to index values in the rest of the function.

Cheers,
Sumeet


On Wed, May 19, 2021 at 3:02 PM Xingbo Huang <[hidden email]> wrote:
Hi Sumeet,

Due to the limitation of the original PyFlink serializers design, there is no way to pass attribute names to Row in row-based operations. In release-1.14, I am reconstructing the implementations of serializers[1]. After completion, accessing attribute names of `Row` in row-based operations will be supported[2].

About the work around way in releases-1.13, maybe you need to manually set the field_names of Row. e.g.
```
def my_table_tranform_fn(x: Row):
    x.set_field_names(['a', 'b', 'c'])
    ...
```

[1] https://issues.apache.org/jira/browse/FLINK-22612
[2] https://issues.apache.org/jira/browse/FLINK-22712

Best,
Xingbo

Sumeet Malhotra <[hidden email]> 于2021年5月19日周三 下午4:45写道:
Hi,

According to the documentation for PyFlink Table row based operations [1], typical usage is as follows:

@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
    for s in x[1].split(","):
        yield x[0], s

table.flat_map(split)

Is there any way that row fields inside the UDTF can be accessed by their attribute names instead of array index? In my use case, I'm doing the following:

raw_data = t_env.from_path('MySource')
raw_data \
    .join_lateral(my_table_tranform_fn(raw_data).alias('a', 'b', 'c') \
    .flat_map(my_flat_map_fn).alias('x', 'y', 'z') \
    .execute_insert("MySink")

In the table function `my_flat_map_fn` I'm unable to access the fields of the row by their attribute names i.e., assuming the input argument to the table function is x, I cannot access fields as x.a, x.b or x.c, instead I have use use x[0], x[1] and x[2]. The error I get is the _fields is not populated.

In my use case, the number of columns is very high and working with indexes is so much error prone and unmaintainable.

Any suggestions?

Thanks,
Sumeet