Accessing columns from input stream table during Window operations

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Accessing columns from input stream table during Window operations

Sumeet Malhotra
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Guowei Ma
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Sumeet Malhotra
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Guowei Ma
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Sumeet Malhotra
Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet
Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Dian Fu
Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly.



2021年4月19日 下午5:16,Sumeet Malhotra <[hidden email]> 写道:

Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet

Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Sumeet Malhotra
Hi Guowei,

Let me elaborate the use case with an example.

Sample input table looks like this:

time    a   b   c
-----------------
t0      a0  b0  1
t1      a0  b1  2
t2      a0  b2  3
t3      a0  b0  6
t4      a0  b1  7
t5      a0  b2  8

Basically, every time interval there are new readings from a fixed set of sensors (b0, b1 and b2). All these rows have a few constant fields representing metadata about the input (a0).

Desired output for every time interval is the average reading for every sensor (b0, b1, b2), along with the constant metadata (a0):

a0    b0    avg(c)
a0    b1    avg(c)
a0    b2    avg(c)

This is what I was trying to build using a simple Tumble window:

input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \

    .group_by(col('w'), input.b) \

    .select( 

        input.a,                            <=== constant metadata field, same for every input record

        input.b,                            <=== group_by field, to compute averages

        input.c.avg.alias('avg_value')) \

    .execute_insert('MySink') \

    .wait()


The example above is highly simplified, but I hope it explains what I'm trying to achieve.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly.



2021年4月19日 下午5:16,Sumeet Malhotra <[hidden email]> 写道:

Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet

Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Dian Fu
Hi Sumeet,

Thanks for the sharing.

Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since the value for input.a is always the same, it’s equal to group_by(col(‘w')input.b) logically. The benefit is that you could access input.a directly in the select clause.

Regards,
Dian

2021年4月19日 下午6:29,Sumeet Malhotra <[hidden email]> 写道:

Hi Guowei,

Let me elaborate the use case with an example.

Sample input table looks like this:

time    a   b   c
-----------------
t0      a0  b0  1
t1      a0  b1  2
t2      a0  b2  3
t3      a0  b0  6
t4      a0  b1  7
t5      a0  b2  8

Basically, every time interval there are new readings from a fixed set of sensors (b0, b1 and b2). All these rows have a few constant fields representing metadata about the input (a0).

Desired output for every time interval is the average reading for every sensor (b0, b1, b2), along with the constant metadata (a0):

a0    b0    avg(c)
a0    b1    avg(c)
a0    b2    avg(c)

This is what I was trying to build using a simple Tumble window:

input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.b) \
    .select( 
        input.a,                            <=== constant metadata field, same for every input record
        input.b,                            <=== group_by field, to compute averages
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

The example above is highly simplified, but I hope it explains what I'm trying to achieve.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly.



2021年4月19日 下午5:16,Sumeet Malhotra <[hidden email]> 写道:

Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet


Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Guowei Ma
Hi, Sumeet
Thanks you for the sharing. As Dian suggested, I think you could use b as your `group_by`'s key and so the b could be output directly.
I think it is more simple.
Best,
Guowei


On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

Thanks for the sharing.

Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since the value for input.a is always the same, it’s equal to group_by(col(‘w')input.b) logically. The benefit is that you could access input.a directly in the select clause.

Regards,
Dian

2021年4月19日 下午6:29,Sumeet Malhotra <[hidden email]> 写道:

Hi Guowei,

Let me elaborate the use case with an example.

Sample input table looks like this:

time    a   b   c
-----------------
t0      a0  b0  1
t1      a0  b1  2
t2      a0  b2  3
t3      a0  b0  6
t4      a0  b1  7
t5      a0  b2  8

Basically, every time interval there are new readings from a fixed set of sensors (b0, b1 and b2). All these rows have a few constant fields representing metadata about the input (a0).

Desired output for every time interval is the average reading for every sensor (b0, b1, b2), along with the constant metadata (a0):

a0    b0    avg(c)
a0    b1    avg(c)
a0    b2    avg(c)

This is what I was trying to build using a simple Tumble window:

input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.b) \
    .select( 
        input.a,                            <=== constant metadata field, same for every input record
        input.b,                            <=== group_by field, to compute averages
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

The example above is highly simplified, but I hope it explains what I'm trying to achieve.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly.



2021年4月19日 下午5:16,Sumeet Malhotra <[hidden email]> 写道:

Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet


Reply | Threaded
Open this post in threaded view
|

Re: Accessing columns from input stream table during Window operations

Sumeet Malhotra
Thanks Dian, Guowei. I think it makes sense to roll with this approach.

On Tue, Apr 20, 2021 at 8:29 AM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
Thanks you for the sharing. As Dian suggested, I think you could use b as your `group_by`'s key and so the b could be output directly.
I think it is more simple.
Best,
Guowei


On Mon, Apr 19, 2021 at 7:31 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

Thanks for the sharing.

Then I guess you could use `.group_by(col('w'), input.a, input.b)`. Since the value for input.a is always the same, it’s equal to group_by(col(‘w')input.b) logically. The benefit is that you could access input.a directly in the select clause.

Regards,
Dian

2021年4月19日 下午6:29,Sumeet Malhotra <[hidden email]> 写道:

Hi Guowei,

Let me elaborate the use case with an example.

Sample input table looks like this:

time    a   b   c
-----------------
t0      a0  b0  1
t1      a0  b1  2
t2      a0  b2  3
t3      a0  b0  6
t4      a0  b1  7
t5      a0  b2  8

Basically, every time interval there are new readings from a fixed set of sensors (b0, b1 and b2). All these rows have a few constant fields representing metadata about the input (a0).

Desired output for every time interval is the average reading for every sensor (b0, b1, b2), along with the constant metadata (a0):

a0    b0    avg(c)
a0    b1    avg(c)
a0    b2    avg(c)

This is what I was trying to build using a simple Tumble window:

input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.b) \
    .select( 
        input.a,                            <=== constant metadata field, same for every input record
        input.b,                            <=== group_by field, to compute averages
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

The example above is highly simplified, but I hope it explains what I'm trying to achieve.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 3:21 PM Dian Fu <[hidden email]> wrote:
Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly.



2021年4月19日 下午5:16,Sumeet Malhotra <[hidden email]> 写道:

Thanks Guowei. I'm trying out Over Windows, as follows:

input \
    .over_window(
        Over.partition_by(col(input.a)) \
        .order_by(input.Timestamp) \
        .preceding(lit(10).seconds) \
        .alias('w')) \
    .select(
        input.b,
        input.c.avg.over(col('w'))) \
    .execute_insert('MySink') \
    .wait()

But running into following exception:

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.table.api.Over.partitionBy. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist

Is there any extra Jar that needs to be included for Over Windows. From the code it doesn't appear so.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet

Maybe you could try the Over Windows[1], which could keep the "non-group-key" column.


Best,
Guowei


On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <[hidden email]> wrote:
Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any issues. It's only when I want to use "input.b".

My use case is to basically emit "input.b" in the final sink as is, and not really perform any aggregation on that column - more like pass through from input to sink. What's the best way to achieve this? I was thinking that making it part of the select() clause would do it, but as you said there needs to be some aggregation performed on it.

Thanks,
Sumeet


On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <[hidden email]> wrote:
Hi, Sumeet
      For "input.b" I think you should aggregate the non-group-key column[1]. 
But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved errors. Would you mind giving more detailed error information?


On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <[hidden email]> wrote:
Hi,

I have a use case where I'm creating a Tumbling window as follows:

"input" table has columns [Timestamp, a, b, c]

input \
    .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
    .group_by(col('w'), input.a) \
    .select( 
        col('w').start.alias('window_start'),
        col('w').end.alias('window_end'),
        input.b,
        input.c.avg.alias('avg_value')) \
    .execute_insert('MySink') \
    .wait()

This throws an exception that it cannot resolve the fields "b" and "c" inside the select statement. If I mention these column names inside the group_by() statement as follows:

.group_by(col('w'), input.a, input.b, input.c)

then the column names in the subsequent select statement can be resolved.

Basically, unless the column name is explicitly made part of the group_by() clause, the subsequent select() clause doesn't resolve it. This is very similar to the example from Flink's documentation here [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples, where a similar procedure works.

Any idea how I can access columns from the input stream, without having to mention them in the group_by() clause? I really don't want to group the results by those fields, but they need to be written to the sink eventually.

Thanks,
Sumeet