PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Manas Kale
Hi, 
I have the following piece of code (for pyFlink v1.11) : 
t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)
Which is generating the exception : 

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in <module>
    .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

The "rowtime" attribute in INPUT_TABLE is created as : 
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
...
     .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)

What is wrong with the code? I believe that I have already indicated which attribute has to be treated as the time attribute.

Thank you,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

r_khachatryan
Hi Manas,

Do you have the same error if you replace
    .group_by("five_sec_window, monitorId") \
with
    .group_by("five_sec_window") \
?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <[hidden email]> wrote:
Hi, 
I have the following piece of code (for pyFlink v1.11) : 
t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)
Which is generating the exception : 

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in <module>
    .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

The "rowtime" attribute in INPUT_TABLE is created as : 
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
...
     .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)

What is wrong with the code? I believe that I have already indicated which attribute has to be treated as the time attribute.

Thank you,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Xingbo Huang
Hi Manas,
Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is the more recommended way

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

Best,
Xingbo

Khachatryan Roman <[hidden email]> 于2020年7月13日周一 下午7:23写道:
Hi Manas,

Do you have the same error if you replace
    .group_by("five_sec_window, monitorId") \
with
    .group_by("five_sec_window") \
?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <[hidden email]> wrote:
Hi, 
I have the following piece of code (for pyFlink v1.11) : 
t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)
Which is generating the exception : 

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in <module>
    .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

The "rowtime" attribute in INPUT_TABLE is created as : 
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
...
     .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)

What is wrong with the code? I believe that I have already indicated which attribute has to be treated as the time attribute.

Thank you,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Manas Kale
@Roman - yes, I have the error if I do that. 
[hidden email] - okay, I didn't know DDL was the more recommended way.
Please let me know if you confirm that this is a bug.
Thanks!

On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,
Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is the more recommended way

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

Best,
Xingbo

Khachatryan Roman <[hidden email]> 于2020年7月13日周一 下午7:23写道:
Hi Manas,

Do you have the same error if you replace
    .group_by("five_sec_window, monitorId") \
with
    .group_by("five_sec_window") \
?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <[hidden email]> wrote:
Hi, 
I have the following piece of code (for pyFlink v1.11) : 
t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)
Which is generating the exception : 

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in <module>
    .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

The "rowtime" attribute in INPUT_TABLE is created as : 
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
...
     .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)

What is wrong with the code? I believe that I have already indicated which attribute has to be treated as the time attribute.

Thank you,
Manas
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API - "A group window expects a time attribute for grouping in a stream environment."

Xingbo Huang
Hi Manas,

Yes, this is a bug which I have also encountered in the Descriptor API, but I don't found the corresponding issue. You can create an issue to report this problem. There are similar bugs in the current descriptor API, so DDL is more recommended way. Now the community has started a discussion on refactoring the Descriptor API[1]

[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-tt42995.html

Best,
Xingbo

Manas Kale <[hidden email]> 于2020年7月14日周二 下午12:50写道:
@Roman - yes, I have the error if I do that. 
[hidden email] - okay, I didn't know DDL was the more recommended way.
Please let me know if you confirm that this is a bug.
Thanks!

On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <[hidden email]> wrote:
Hi Manas,
Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is the more recommended way

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

Best,
Xingbo

Khachatryan Roman <[hidden email]> 于2020年7月13日周一 下午7:23写道:
Hi Manas,

Do you have the same error if you replace
    .group_by("five_sec_window, monitorId") \
with
    .group_by("five_sec_window") \
?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <[hidden email]> wrote:
Hi, 
I have the following piece of code (for pyFlink v1.11) : 
t_env.from_path(INPUT_TABLE) \
.select("monitorId, data, rowtime") \
.window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
.group_by("five_sec_window, monitorId") \
.select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
.execute_insert(OUTPUT_TABLE)
Which is generating the exception : 

Traceback (most recent call last):
  File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in <module>
    .select("monitorId, data.avg, data.min, data.max, five_sec_window.rowtime") \
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", line 907, in select
    return Table(self._j_table.select(fields), self._t_env)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.

The "rowtime" attribute in INPUT_TABLE is created as : 
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
...
     .field("rowtime", DataTypes.TIMESTAMP(3))
.rowtime(
Rowtime()
.timestamps_from_field("time_st")
.watermarks_periodic_ascending())

).create_temporary_table(INPUT_TABLE)

What is wrong with the code? I believe that I have already indicated which attribute has to be treated as the time attribute.

Thank you,
Manas