Hi community,
I am testing the "Split Distinct Aggregation" [1] consuming the taxi ride data set. My sql query from the table environment is the one below: Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, COUNT(driverId) FROM TaxiRide GROUP BY startDate"); and I am enableing: configuration.setString("table.exec.mini-batch.enabled", "true"); configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); and finally configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); I was expecting that the query plan at the WEB UI show to me two hash phases as it is present here on the image [1]. Instead, it is showing to me the same plan with one hash phase as I was deploying only one Local aggregate and one Global aggregate (of course, taking the parallel instances into consideration). Please see the query execution plan image attached. Is there something that I am missing when I config the Table API? By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. Is the "MiniBatch Aggregation" aggregating as a processing time window on the operator after the hash phase? If it is, isn't it the same as a window aggregation instead of an unbounded window as the example presents? Thanks! Felipe [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
I realized that I forgot the image. Now it is attached.
-- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez <[hidden email]> wrote: > > Hi community, > > I am testing the "Split Distinct Aggregation" [1] consuming the taxi > ride data set. My sql query from the table environment is the one > below: > > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, > COUNT(driverId) FROM TaxiRide GROUP BY startDate"); > > and I am enableing: > configuration.setString("table.exec.mini-batch.enabled", "true"); > configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); > configuration.setString("table.exec.mini-batch.size", "5000"); > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); > and finally > configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); > > I was expecting that the query plan at the WEB UI show to me two hash > phases as it is present here on the image [1]. Instead, it is showing > to me the same plan with one hash phase as I was deploying only one > Local aggregate and one Global aggregate (of course, taking the > parallel instances into consideration). Please see the query execution > plan image attached. > > Is there something that I am missing when I config the Table API? > By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. > Is the "MiniBatch Aggregation" aggregating as a processing time window > on the operator after the hash phase? If it is, isn't it the same as a > window aggregation instead of an unbounded window as the example > presents? > > Thanks! > Felipe > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com Screenshot from 2020-11-09 13-36-18.png (29K) Download Attachment |
Hi Felipe, The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). However, the query in your example is using COUNT(driverId). You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory and processes them together to reduce the state accessing. But processing-time window is still a per-record state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator of current this mini-batch to the downstream global aggregation, and this improves performance a lot. 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. But a window aggregate is triggered by a deterministic time. Best, Jark On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: I realized that I forgot the image. Now it is attached. |
Hi Jark,
thanks for your reply. Indeed, I forgot to write DISTINCT on the query and now the query plan is splitting into two hash partition phases. what do you mean by deterministic time? Why only the window aggregate is deterministic? If I implement the ProcessingTimeCallback [1] interface is it deterministic? [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html Thanks -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote: > > Hi Felipe, > > The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, > only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). > > However, the query in your example is using COUNT(driverId). > You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. > > Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. > > 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory > and processes them together to reduce the state accessing. But processing-time window is still a per-record > state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator > of current this mini-batch to the downstream global aggregation, and this improves performance a lot. > 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. > But a window aggregate is triggered by a deterministic time. > > > Best, > Jark > > > On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: >> >> I realized that I forgot the image. Now it is attached. >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com >> >> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > Hi community, >> > >> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi >> > ride data set. My sql query from the table environment is the one >> > below: >> > >> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, >> > COUNT(driverId) FROM TaxiRide GROUP BY startDate"); >> > >> > and I am enableing: >> > configuration.setString("table.exec.mini-batch.enabled", "true"); >> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); >> > configuration.setString("table.exec.mini-batch.size", "5000"); >> > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); >> > and finally >> > configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); >> > >> > I was expecting that the query plan at the WEB UI show to me two hash >> > phases as it is present here on the image [1]. Instead, it is showing >> > to me the same plan with one hash phase as I was deploying only one >> > Local aggregate and one Global aggregate (of course, taking the >> > parallel instances into consideration). Please see the query execution >> > plan image attached. >> > >> > Is there something that I am missing when I config the Table API? >> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. >> > Is the "MiniBatch Aggregation" aggregating as a processing time window >> > on the operator after the hash phase? If it is, isn't it the same as a >> > window aggregation instead of an unbounded window as the example >> > presents? >> > >> > Thanks! >> > Felipe >> > >> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation >> > [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com |
Hi Felipe,
with non-deterministic Jark meant that you never know if the mini batch timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the execution. This depends how fast records arrive at the operator. In general, processing time can be considered non-deterministic, because 100ms must not be 100ms. This depends on the CPU load and other tasks such garbage collection etc. Only event-time (and thus event time windows) that work on the timestamp in the data instead of machine time is determistic, Regards, Timo On 10.11.20 12:02, Felipe Gutierrez wrote: > Hi Jark, > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query > and now the query plan is splitting into two hash partition phases. > > what do you mean by deterministic time? Why only the window aggregate > is deterministic? If I implement the ProcessingTimeCallback [1] > interface is it deterministic? > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html > Thanks > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote: >> >> Hi Felipe, >> >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). >> >> However, the query in your example is using COUNT(driverId). >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. >> >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. >> >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory >> and processes them together to reduce the state accessing. But processing-time window is still a per-record >> state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator >> of current this mini-batch to the downstream global aggregation, and this improves performance a lot. >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. >> But a window aggregate is triggered by a deterministic time. >> >> >> Best, >> Jark >> >> >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: >>> >>> I realized that I forgot the image. Now it is attached. >>> -- >>> -- Felipe Gutierrez >>> -- skype: felipe.o.gutierrez >>> -- https://felipeogutierrez.blogspot.com >>> >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez >>> <[hidden email]> wrote: >>>> >>>> Hi community, >>>> >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi >>>> ride data set. My sql query from the table environment is the one >>>> below: >>>> >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); >>>> >>>> and I am enableing: >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); >>>> configuration.setString("table.exec.mini-batch.size", "5000"); >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); >>>> and finally >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); >>>> >>>> I was expecting that the query plan at the WEB UI show to me two hash >>>> phases as it is present here on the image [1]. Instead, it is showing >>>> to me the same plan with one hash phase as I was deploying only one >>>> Local aggregate and one Global aggregate (of course, taking the >>>> parallel instances into consideration). Please see the query execution >>>> plan image attached. >>>> >>>> Is there something that I am missing when I config the Table API? >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window >>>> on the operator after the hash phase? If it is, isn't it the same as a >>>> window aggregation instead of an unbounded window as the example >>>> presents? >>>> >>>> Thanks! >>>> Felipe >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >>>> -- >>>> -- Felipe Gutierrez >>>> -- skype: felipe.o.gutierrez >>>> -- https://felipeogutierrez.blogspot.com > |
I see, thanks Timo
-- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote: > > Hi Felipe, > > with non-deterministic Jark meant that you never know if the mini batch > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the > execution. This depends how fast records arrive at the operator. > > In general, processing time can be considered non-deterministic, because > 100ms must not be 100ms. This depends on the CPU load and other tasks > such garbage collection etc. Only event-time (and thus event time > windows) that work on the timestamp in the data instead of machine time > is determistic, > > Regards, > Timo > > > On 10.11.20 12:02, Felipe Gutierrez wrote: > > Hi Jark, > > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query > > and now the query plan is splitting into two hash partition phases. > > > > what do you mean by deterministic time? Why only the window aggregate > > is deterministic? If I implement the ProcessingTimeCallback [1] > > interface is it deterministic? > > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html > > Thanks > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote: > >> > >> Hi Felipe, > >> > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, > >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). > >> > >> However, the query in your example is using COUNT(driverId). > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. > >> > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. > >> > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory > >> and processes them together to reduce the state accessing. But processing-time window is still a per-record > >> state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator > >> of current this mini-batch to the downstream global aggregation, and this improves performance a lot. > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. > >> But a window aggregate is triggered by a deterministic time. > >> > >> > >> Best, > >> Jark > >> > >> > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: > >>> > >>> I realized that I forgot the image. Now it is attached. > >>> -- > >>> -- Felipe Gutierrez > >>> -- skype: felipe.o.gutierrez > >>> -- https://felipeogutierrez.blogspot.com > >>> > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez > >>> <[hidden email]> wrote: > >>>> > >>>> Hi community, > >>>> > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi > >>>> ride data set. My sql query from the table environment is the one > >>>> below: > >>>> > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); > >>>> > >>>> and I am enableing: > >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); > >>>> configuration.setString("table.exec.mini-batch.size", "5000"); > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); > >>>> and finally > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); > >>>> > >>>> I was expecting that the query plan at the WEB UI show to me two hash > >>>> phases as it is present here on the image [1]. Instead, it is showing > >>>> to me the same plan with one hash phase as I was deploying only one > >>>> Local aggregate and one Global aggregate (of course, taking the > >>>> parallel instances into consideration). Please see the query execution > >>>> plan image attached. > >>>> > >>>> Is there something that I am missing when I config the Table API? > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window > >>>> on the operator after the hash phase? If it is, isn't it the same as a > >>>> window aggregation instead of an unbounded window as the example > >>>> presents? > >>>> > >>>> Thanks! > >>>> Felipe > >>>> > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > >>>> -- > >>>> -- Felipe Gutierrez > >>>> -- skype: felipe.o.gutierrez > >>>> -- https://felipeogutierrez.blogspot.com > > > |
Hi Jack,
I don't get the difference from the "MiniBatch Aggregation" if compared with the "Local-Global Aggregation". On the web page [1] it says that I have to enable the TWO_PHASE parameter. So I compared the query plan from both, with and without the TWO_PHASE parameter. And they are the same. So, I conclude that the mini-batch already is a TWO_PHASE strategy since it is already pre-aggregating locally. Is it correct? Here are both query plans: Thanks, Felipe Table API: mini-batch.enable : true Table API: distinct-agg.split.enabled : false Table API: parallelism : 4 Table API: mini-batch.latency : 1 s Table API: mini_batch.size : 1000 Table API: mini_batch.two_phase : false { "nodes" : [ { "id" : 1, "type" : "Source: source", "pact" : "Data Source", "contents" : "Source: source", "parallelism" : 4 }, { "id" : 2, "type" : "tokenizer", "pact" : "Operator", "contents" : "tokenizer", "parallelism" : 4, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "SourceConversion(table=[Unregistered_DataStream_2], fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, passengerCnt, rideId, startLat, startLon, startTime, taxiId])", "pact" : "Operator", "contents" : "SourceConversion(table=[Unregistered_DataStream_2], fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, passengerCnt, rideId, startLat, startLon, startTime, taxiId])", "parallelism" : 4, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 4, "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", "pact" : "Operator", "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", "parallelism" : 4, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(passengerCnt) AS count$0])", "pact" : "Operator", "contents" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(passengerCnt) AS count$0])", "parallelism" : 4, "predecessors" : [ { "id" : 4, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 7, "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(count$0) AS EXPR$0])", "pact" : "Operator", "contents" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(count$0) AS EXPR$0])", "parallelism" : 4, "predecessors" : [ { "id" : 5, "ship_strategy" : "HASH", "side" : "second" } ] }, { "id" : 8, "type" : "SinkConversionToTuple2", "pact" : "Operator", "contents" : "SinkConversionToTuple2", "parallelism" : 4, "predecessors" : [ { "id" : 7, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 9, "type" : "flat-output", "pact" : "Operator", "contents" : "flat-output", "parallelism" : 4, "predecessors" : [ { "id" : 8, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 10, "type" : "Sink: sink", "pact" : "Data Sink", "contents" : "Sink: sink", "parallelism" : 4, "predecessors" : [ { "id" : 9, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } Table API: mini-batch.enable : true Table API: distinct-agg.split.enabled : false Table API: parallelism : 4 Table API: mini-batch.latency : 1 s Table API: mini_batch.size : 1000 Table API: mini_batch.two_phase : true { "nodes" : [ { "id" : 1, "type" : "Source: source", "pact" : "Data Source", "contents" : "Source: source", "parallelism" : 4 }, { "id" : 2, "type" : "tokenizer", "pact" : "Operator", "contents" : "tokenizer", "parallelism" : 4, "predecessors" : [ { "id" : 1, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 3, "type" : "SourceConversion(table=[Unregistered_DataStream_2], fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, passengerCnt, rideId, startLat, startLon, startTime, taxiId])", "pact" : "Operator", "contents" : "SourceConversion(table=[Unregistered_DataStream_2], fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, passengerCnt, rideId, startLat, startLon, startTime, taxiId])", "parallelism" : 4, "predecessors" : [ { "id" : 2, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 4, "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", "pact" : "Operator", "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", "parallelism" : 4, "predecessors" : [ { "id" : 3, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 5, "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(passengerCnt) AS count$0])", "pact" : "Operator", "contents" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(passengerCnt) AS count$0])", "parallelism" : 4, "predecessors" : [ { "id" : 4, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 7, "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(count$0) AS EXPR$0])", "pact" : "Operator", "contents" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, COUNT(count$0) AS EXPR$0])", "parallelism" : 4, "predecessors" : [ { "id" : 5, "ship_strategy" : "HASH", "side" : "second" } ] }, { "id" : 8, "type" : "SinkConversionToTuple2", "pact" : "Operator", "contents" : "SinkConversionToTuple2", "parallelism" : 4, "predecessors" : [ { "id" : 7, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 9, "type" : "flat-output", "pact" : "Operator", "contents" : "flat-output", "parallelism" : 4, "predecessors" : [ { "id" : 8, "ship_strategy" : "FORWARD", "side" : "second" } ] }, { "id" : 10, "type" : "Sink: sink", "pact" : "Data Sink", "contents" : "Sink: sink", "parallelism" : 4, "predecessors" : [ { "id" : 9, "ship_strategy" : "FORWARD", "side" : "second" } ] } ] } [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez <[hidden email]> wrote: > > I see, thanks Timo > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote: > > > > Hi Felipe, > > > > with non-deterministic Jark meant that you never know if the mini batch > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the > > execution. This depends how fast records arrive at the operator. > > > > In general, processing time can be considered non-deterministic, because > > 100ms must not be 100ms. This depends on the CPU load and other tasks > > such garbage collection etc. Only event-time (and thus event time > > windows) that work on the timestamp in the data instead of machine time > > is determistic, > > > > Regards, > > Timo > > > > > > On 10.11.20 12:02, Felipe Gutierrez wrote: > > > Hi Jark, > > > > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query > > > and now the query plan is splitting into two hash partition phases. > > > > > > what do you mean by deterministic time? Why only the window aggregate > > > is deterministic? If I implement the ProcessingTimeCallback [1] > > > interface is it deterministic? > > > > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html > > > Thanks > > > > > > -- > > > -- Felipe Gutierrez > > > -- skype: felipe.o.gutierrez > > > -- https://felipeogutierrez.blogspot.com > > > > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote: > > >> > > >> Hi Felipe, > > >> > > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, > > >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). > > >> > > >> However, the query in your example is using COUNT(driverId). > > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. > > >> > > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. > > >> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory > > >> and processes them together to reduce the state accessing. But processing-time window is still a per-record > > >> state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator > > >> of current this mini-batch to the downstream global aggregation, and this improves performance a lot. > > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. > > >> But a window aggregate is triggered by a deterministic time. > > >> > > >> > > >> Best, > > >> Jark > > >> > > >> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: > > >>> > > >>> I realized that I forgot the image. Now it is attached. > > >>> -- > > >>> -- Felipe Gutierrez > > >>> -- skype: felipe.o.gutierrez > > >>> -- https://felipeogutierrez.blogspot.com > > >>> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez > > >>> <[hidden email]> wrote: > > >>>> > > >>>> Hi community, > > >>>> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi > > >>>> ride data set. My sql query from the table environment is the one > > >>>> below: > > >>>> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); > > >>>> > > >>>> and I am enableing: > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); > > >>>> configuration.setString("table.exec.mini-batch.size", "5000"); > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); > > >>>> and finally > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); > > >>>> > > >>>> I was expecting that the query plan at the WEB UI show to me two hash > > >>>> phases as it is present here on the image [1]. Instead, it is showing > > >>>> to me the same plan with one hash phase as I was deploying only one > > >>>> Local aggregate and one Global aggregate (of course, taking the > > >>>> parallel instances into consideration). Please see the query execution > > >>>> plan image attached. > > >>>> > > >>>> Is there something that I am missing when I config the Table API? > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window > > >>>> on the operator after the hash phase? If it is, isn't it the same as a > > >>>> window aggregation instead of an unbounded window as the example > > >>>> presents? > > >>>> > > >>>> Thanks! > > >>>> Felipe > > >>>> > > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation > > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > > >>>> -- > > >>>> -- Felipe Gutierrez > > >>>> -- skype: felipe.o.gutierrez > > >>>> -- https://felipeogutierrez.blogspot.com > > > > > |
Hi Felipe, The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, if will use TWO-PHASE, otherwise ONE-PHASE. On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <[hidden email]> wrote: Hi Jack, |
I see. now it has different query plans. It was documented on another
page so I got confused. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, Nov 12, 2020 at 12:41 PM Jark Wu <[hidden email]> wrote: > > Hi Felipe, > > The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, > if will use TWO-PHASE, otherwise ONE-PHASE. > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy > > On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <[hidden email]> wrote: >> >> Hi Jack, >> >> I don't get the difference from the "MiniBatch Aggregation" if >> compared with the "Local-Global Aggregation". On the web page [1] it >> says that I have to enable the TWO_PHASE parameter. So I compared the >> query plan from both, with and without the TWO_PHASE parameter. And >> they are the same. So, I conclude that the mini-batch already is a >> TWO_PHASE strategy since it is already pre-aggregating locally. Is it >> correct? >> >> Here are both query plans: >> Thanks, Felipe >> >> Table API: mini-batch.enable : true >> Table API: distinct-agg.split.enabled : false >> Table API: parallelism : 4 >> Table API: mini-batch.latency : 1 s >> Table API: mini_batch.size : 1000 >> Table API: mini_batch.two_phase : false >> >> { >> "nodes" : [ { >> "id" : 1, >> "type" : "Source: source", >> "pact" : "Data Source", >> "contents" : "Source: source", >> "parallelism" : 4 >> }, { >> "id" : 2, >> "type" : "tokenizer", >> "pact" : "Operator", >> "contents" : "tokenizer", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 1, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 3, >> "type" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "pact" : "Operator", >> "contents" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 2, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 4, >> "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "pact" : "Operator", >> "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 3, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 5, >> "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(passengerCnt) AS count$0])", >> "pact" : "Operator", >> "contents" : "LocalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(passengerCnt) AS count$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 4, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 7, >> "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(count$0) AS EXPR$0])", >> "pact" : "Operator", >> "contents" : "GlobalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(count$0) AS EXPR$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 5, >> "ship_strategy" : "HASH", >> "side" : "second" >> } ] >> }, { >> "id" : 8, >> "type" : "SinkConversionToTuple2", >> "pact" : "Operator", >> "contents" : "SinkConversionToTuple2", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 7, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 9, >> "type" : "flat-output", >> "pact" : "Operator", >> "contents" : "flat-output", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 8, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 10, >> "type" : "Sink: sink", >> "pact" : "Data Sink", >> "contents" : "Sink: sink", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 9, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> } ] >> } >> >> Table API: mini-batch.enable : true >> Table API: distinct-agg.split.enabled : false >> Table API: parallelism : 4 >> Table API: mini-batch.latency : 1 s >> Table API: mini_batch.size : 1000 >> Table API: mini_batch.two_phase : true >> >> { >> "nodes" : [ { >> "id" : 1, >> "type" : "Source: source", >> "pact" : "Data Source", >> "contents" : "Source: source", >> "parallelism" : 4 >> }, { >> "id" : 2, >> "type" : "tokenizer", >> "pact" : "Operator", >> "contents" : "tokenizer", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 1, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 3, >> "type" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "pact" : "Operator", >> "contents" : "SourceConversion(table=[Unregistered_DataStream_2], >> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart, >> passengerCnt, rideId, startLat, startLon, startTime, taxiId])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 2, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 4, >> "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "pact" : "Operator", >> "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 3, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 5, >> "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(passengerCnt) AS count$0])", >> "pact" : "Operator", >> "contents" : "LocalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(passengerCnt) AS count$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 4, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 7, >> "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId, >> COUNT(count$0) AS EXPR$0])", >> "pact" : "Operator", >> "contents" : "GlobalGroupAggregate(groupBy=[taxiId], >> select=[taxiId, COUNT(count$0) AS EXPR$0])", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 5, >> "ship_strategy" : "HASH", >> "side" : "second" >> } ] >> }, { >> "id" : 8, >> "type" : "SinkConversionToTuple2", >> "pact" : "Operator", >> "contents" : "SinkConversionToTuple2", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 7, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 9, >> "type" : "flat-output", >> "pact" : "Operator", >> "contents" : "flat-output", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 8, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> }, { >> "id" : 10, >> "type" : "Sink: sink", >> "pact" : "Data Sink", >> "contents" : "Sink: sink", >> "parallelism" : 4, >> "predecessors" : [ { >> "id" : 9, >> "ship_strategy" : "FORWARD", >> "side" : "second" >> } ] >> } ] >> } >> >> >> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html >> >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com >> >> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez >> <[hidden email]> wrote: >> > >> > I see, thanks Timo >> > >> > -- >> > -- Felipe Gutierrez >> > -- skype: felipe.o.gutierrez >> > -- https://felipeogutierrez.blogspot.com >> > >> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote: >> > > >> > > Hi Felipe, >> > > >> > > with non-deterministic Jark meant that you never know if the mini batch >> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the >> > > execution. This depends how fast records arrive at the operator. >> > > >> > > In general, processing time can be considered non-deterministic, because >> > > 100ms must not be 100ms. This depends on the CPU load and other tasks >> > > such garbage collection etc. Only event-time (and thus event time >> > > windows) that work on the timestamp in the data instead of machine time >> > > is determistic, >> > > >> > > Regards, >> > > Timo >> > > >> > > >> > > On 10.11.20 12:02, Felipe Gutierrez wrote: >> > > > Hi Jark, >> > > > >> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query >> > > > and now the query plan is splitting into two hash partition phases. >> > > > >> > > > what do you mean by deterministic time? Why only the window aggregate >> > > > is deterministic? If I implement the ProcessingTimeCallback [1] >> > > > interface is it deterministic? >> > > > >> > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html >> > > > Thanks >> > > > >> > > > -- >> > > > -- Felipe Gutierrez >> > > > -- skype: felipe.o.gutierrez >> > > > -- https://felipeogutierrez.blogspot.com >> > > > >> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote: >> > > >> >> > > >> Hi Felipe, >> > > >> >> > > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option, >> > > >> only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). >> > > >> >> > > >> However, the query in your example is using COUNT(driverId). >> > > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. >> > > >> >> > > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. >> > > >> >> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory >> > > >> and processes them together to reduce the state accessing. But processing-time window is still a per-record >> > > >> state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator >> > > >> of current this mini-batch to the downstream global aggregation, and this improves performance a lot. >> > > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. >> > > >> But a window aggregate is triggered by a deterministic time. >> > > >> >> > > >> >> > > >> Best, >> > > >> Jark >> > > >> >> > > >> >> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote: >> > > >>> >> > > >>> I realized that I forgot the image. Now it is attached. >> > > >>> -- >> > > >>> -- Felipe Gutierrez >> > > >>> -- skype: felipe.o.gutierrez >> > > >>> -- https://felipeogutierrez.blogspot.com >> > > >>> >> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez >> > > >>> <[hidden email]> wrote: >> > > >>>> >> > > >>>> Hi community, >> > > >>>> >> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi >> > > >>>> ride data set. My sql query from the table environment is the one >> > > >>>> below: >> > > >>>> >> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate, >> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate"); >> > > >>>> >> > > >>>> and I am enableing: >> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true"); >> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s"); >> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000"); >> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); >> > > >>>> and finally >> > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); >> > > >>>> >> > > >>>> I was expecting that the query plan at the WEB UI show to me two hash >> > > >>>> phases as it is present here on the image [1]. Instead, it is showing >> > > >>>> to me the same plan with one hash phase as I was deploying only one >> > > >>>> Local aggregate and one Global aggregate (of course, taking the >> > > >>>> parallel instances into consideration). Please see the query execution >> > > >>>> plan image attached. >> > > >>>> >> > > >>>> Is there something that I am missing when I config the Table API? >> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2]. >> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window >> > > >>>> on the operator after the hash phase? If it is, isn't it the same as a >> > > >>>> window aggregation instead of an unbounded window as the example >> > > >>>> presents? >> > > >>>> >> > > >>>> Thanks! >> > > >>>> Felipe >> > > >>>> >> > > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation >> > > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >> > > >>>> -- >> > > >>>> -- Felipe Gutierrez >> > > >>>> -- skype: felipe.o.gutierrez >> > > >>>> -- https://felipeogutierrez.blogspot.com >> > > > >> > > |
Free forum by Nabble | Edit this page |