How to use ProcessWindowFunction in pyflink?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use ProcessWindowFunction in pyflink?

Hongyuan Ma
Greetings,

I am a newbie to pyflink. I want to be able to use processWindowFunction in a Tumble Window, and finally output 0 or more lines. I have checked the datastreamAPI and TableAPI of pyflink, but have not found a complete example. pyflink's datastream API does not seem to implement window() yet. And I'm not sure how to use TableAPI. 

If I use java to implement "public class MyProcessWindowFunctextends ProcessWindowFunction” and registered as udf in python,
is it possible to call it through select statement in pyflink? Can the select statement correctly return zero or more rows of results?

Any help will be appreciated!

-----
Best Regards,
Hongyuan Ma
Reply | Threaded
Open this post in threaded view
|

Re: How to use ProcessWindowFunction in pyflink?

Arvid Heise-4
Hi Hongyuan,

it seems as if PyFlink's datastream API is still lacking window support [1], which is targeted for next release.

Examples for windows in PyFlink's table API are available here [2].

from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col

orders = t_env.from_path("Orders")
result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ 
               .group_by(orders.a, col('w')) \
               .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))



On Fri, Feb 19, 2021 at 8:26 AM Hongyuan Ma <[hidden email]> wrote:
Greetings,

I am a newbie to pyflink. I want to be able to use processWindowFunction in a Tumble Window, and finally output 0 or more lines. I have checked the datastreamAPI and TableAPI of pyflink, but have not found a complete example. pyflink's datastream API does not seem to implement window() yet. And I'm not sure how to use TableAPI. 

If I use java to implement "public class MyProcessWindowFunctextends ProcessWindowFunction” and registered as udf in python,
is it possible to call it through select statement in pyflink? Can the select statement correctly return zero or more rows of results?

Any help will be appreciated!

-----
Best Regards,
Hongyuan Ma