How can I increase the parallelism on the Table API for Streaming Aggregation?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How can I increase the parallelism on the Table API for Streaming Aggregation?

Felipe Gutierrez
Hi community,

I was implementing the stream aggregation using Table API [1] and
trying out the local aggregation plan to optimize the query. Basically
I had to configure it like this:

Configuration configuration = tableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setInteger("table.exec.resource.default-parallelism", 4);
// local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
configuration.setString("table.exec.mini-batch.size", "1000");
// enable two-phase, i.e. local-global aggregation
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

and when I saw the query plan on the dashboard I realized that the
LocalGroupAggregate is with parallelism 1 while the
GlobalGroupAggregate is with parallelism 4. Why was the
LocalGroupAggregate also with parallelism 4 since I set it on the
property ("table.exec.resource.default-parallelism"? Here is my code
[2].

Thanks,
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
[2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How can I increase the parallelism on the Table API for Streaming Aggregation?

r_khachatryan
Hi Felipe,

Your source is not parallel so it doesn't make sense to make local group operator parallel.
If the source implemented ParallelSourceFunction, subsequent operators would be parallelized too.

Regards,
Roman


On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <[hidden email]> wrote:
Hi community,

I was implementing the stream aggregation using Table API [1] and
trying out the local aggregation plan to optimize the query. Basically
I had to configure it like this:

Configuration configuration = tableEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setInteger("table.exec.resource.default-parallelism", 4);
// local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
configuration.setString("table.exec.mini-batch.size", "1000");
// enable two-phase, i.e. local-global aggregation
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

and when I saw the query plan on the dashboard I realized that the
LocalGroupAggregate is with parallelism 1 while the
GlobalGroupAggregate is with parallelism 4. Why was the
LocalGroupAggregate also with parallelism 4 since I set it on the
property ("table.exec.resource.default-parallelism"? Here is my code
[2].

Thanks,
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
[2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How can I increase the parallelism on the Table API for Streaming Aggregation?

Felipe Gutierrez
thanks! I will test
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, Oct 8, 2020 at 6:19 PM Khachatryan Roman
<[hidden email]> wrote:

>
> Hi Felipe,
>
> Your source is not parallel so it doesn't make sense to make local group operator parallel.
> If the source implemented ParallelSourceFunction, subsequent operators would be parallelized too.
>
> Regards,
> Roman
>
>
> On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi community,
>>
>> I was implementing the stream aggregation using Table API [1] and
>> trying out the local aggregation plan to optimize the query. Basically
>> I had to configure it like this:
>>
>> Configuration configuration = tableEnv.getConfig().getConfiguration();
>> // set low-level key-value options
>> configuration.setInteger("table.exec.resource.default-parallelism", 4);
>> // local-global aggregation depends on mini-batch is enabled
>> configuration.setString("table.exec.mini-batch.enabled", "true");
>> configuration.setString("table.exec.mini-batch.allow-latency", "1 s");
>> configuration.setString("table.exec.mini-batch.size", "1000");
>> // enable two-phase, i.e. local-global aggregation
>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>>
>> and when I saw the query plan on the dashboard I realized that the
>> LocalGroupAggregate is with parallelism 1 while the
>> GlobalGroupAggregate is with parallelism 4. Why was the
>> LocalGroupAggregate also with parallelism 4 since I set it on the
>> property ("table.exec.resource.default-parallelism"? Here is my code
>> [2].
>>
>> Thanks,
>> Felipe
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>> [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com