PyFlink: Split input table stream using filter()

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink: Split input table stream using filter()

Sumeet Malhotra
Hi,

I would like to split streamed data from Kafka into 2 streams based on some filter criteria, using PyFlink Table API. As described here [1], a way to do this is to use .filter() which should split the stream for parallel processing.

Does this approach work in Table API as well? I'm doing the following, but control never reaches the second stream.

input = t_env.from_path('TableName')
stream1 = input.filter(<condition1>).select(...)...
stream2 = input.filter(<condition2>).select(...)...

When I execute this, I only see the first stream getting processed. Control never reaches stream2. I have set parallelism to 2.

Am I missing something? Or is this only supported in Datastreams?

Thanks in advance,
Sumeet


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink: Split input table stream using filter()

Dian Fu
Hi Sumeet,

Yes, this approach also works in Table API.

Could you share which API you use to execute the job? For jobs with multiple sinks, StatementSet should be used. You could refer to [1] for more details on this.

Regards,
Dian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/intro_to_table_api/#emit-results-to-multiple-sink-tables

2021年5月5日 下午5:51,Sumeet Malhotra <[hidden email]> 写道:

Hi,

I would like to split streamed data from Kafka into 2 streams based on some filter criteria, using PyFlink Table API. As described here [1], a way to do this is to use .filter() which should split the stream for parallel processing.

Does this approach work in Table API as well? I'm doing the following, but control never reaches the second stream.

input = t_env.from_path('TableName')
stream1 = input.filter(<condition1>).select(...)...
stream2 = input.filter(<condition2>).select(...)...

When I execute this, I only see the first stream getting processed. Control never reaches stream2. I have set parallelism to 2.

Am I missing something? Or is this only supported in Datastreams?

Thanks in advance,
Sumeet