Hi community,
I was implementing the stream aggregation using Table API [1] and trying out the local aggregation plan to optimize the query. Basically I had to configure it like this: Configuration configuration = tableEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setInteger("table.exec.resource.default-parallelism", 4); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "1 s"); configuration.setString("table.exec.mini-batch.size", "1000"); // enable two-phase, i.e. local-global aggregation configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); and when I saw the query plan on the dashboard I realized that the LocalGroupAggregate is with parallelism 1 while the GlobalGroupAggregate is with parallelism 4. Why was the LocalGroupAggregate also with parallelism 4 since I set it on the property ("table.exec.resource.default-parallelism"? Here is my code [2]. Thanks, Felipe [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
Hi Felipe, Your source is not parallel so it doesn't make sense to make local group operator parallel. If the source implemented ParallelSourceFunction, subsequent operators would be parallelized too. Regards,
Roman On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <[hidden email]> wrote: Hi community, |
thanks! I will test
-- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, Oct 8, 2020 at 6:19 PM Khachatryan Roman <[hidden email]> wrote: > > Hi Felipe, > > Your source is not parallel so it doesn't make sense to make local group operator parallel. > If the source implemented ParallelSourceFunction, subsequent operators would be parallelized too. > > Regards, > Roman > > > On Thu, Oct 8, 2020 at 5:00 PM Felipe Gutierrez <[hidden email]> wrote: >> >> Hi community, >> >> I was implementing the stream aggregation using Table API [1] and >> trying out the local aggregation plan to optimize the query. Basically >> I had to configure it like this: >> >> Configuration configuration = tableEnv.getConfig().getConfiguration(); >> // set low-level key-value options >> configuration.setInteger("table.exec.resource.default-parallelism", 4); >> // local-global aggregation depends on mini-batch is enabled >> configuration.setString("table.exec.mini-batch.enabled", "true"); >> configuration.setString("table.exec.mini-batch.allow-latency", "1 s"); >> configuration.setString("table.exec.mini-batch.size", "1000"); >> // enable two-phase, i.e. local-global aggregation >> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); >> >> and when I saw the query plan on the dashboard I realized that the >> LocalGroupAggregate is with parallelism 1 while the >> GlobalGroupAggregate is with parallelism 4. Why was the >> LocalGroupAggregate also with parallelism 4 since I set it on the >> property ("table.exec.resource.default-parallelism"? Here is my code >> [2]. >> >> Thanks, >> Felipe >> >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html >> [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/table/TaxiRideCountTable.java >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com |
Free forum by Nabble | Edit this page |