Stream aggregation using Flink Table API (Blink plan)

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
Hi community,

I am testing the "Split Distinct Aggregation" [1] consuming the taxi
ride data set. My sql query from the table environment is the one
below:

Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
COUNT(driverId) FROM TaxiRide GROUP BY startDate");

and I am enableing:
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
and finally
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

I was expecting that the query plan at the WEB UI show to me two hash
phases as it is present here on the image [1]. Instead, it is showing
to me the same plan with one hash phase as I was deploying only one
Local aggregate and one Global aggregate (of course, taking the
parallel instances into consideration). Please see the query execution
plan image attached.

Is there something that I am missing when I config the Table API?
By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
Is the "MiniBatch Aggregation" aggregating as a processing time window
on the operator after the hash phase? If it is, isn't it the same as a
window aggregation instead of an unbounded window as the example
presents?

Thanks!
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
I realized that I forgot the image. Now it is attached.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> Hi community,
>
> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> ride data set. My sql query from the table environment is the one
> below:
>
> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>
> and I am enableing:
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> configuration.setString("table.exec.mini-batch.size", "5000");
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> and finally
> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>
> I was expecting that the query plan at the WEB UI show to me two hash
> phases as it is present here on the image [1]. Instead, it is showing
> to me the same plan with one hash phase as I was deploying only one
> Local aggregate and one Global aggregate (of course, taking the
> parallel instances into consideration). Please see the query execution
> plan image attached.
>
> Is there something that I am missing when I config the Table API?
> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> Is the "MiniBatch Aggregation" aggregating as a processing time window
> on the operator after the hash phase? If it is, isn't it the same as a
> window aggregation instead of an unbounded window as the example
> presents?
>
> Thanks!
> Felipe
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com

Screenshot from 2020-11-09 13-36-18.png (29K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Jark Wu-3
Hi Felipe,

The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
 only works for distinct aggregations (e.g. COUNT(DISTINCT ...)). 

However, the query in your example is using COUNT(driverId).
You can update it to COUNT(DISTINCT driverId), and it should show two hash phases. 

Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation. 

1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
 and processes them together to reduce the state accessing. But processing-time window is still a per-record
 state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
 of current this mini-batch to the downstream global aggregation, and this improves performance a lot. 
2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout. 
  But a window aggregate is triggered by a deterministic time. 


Best,
Jark


On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
I realized that I forgot the image. Now it is attached.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> Hi community,
>
> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> ride data set. My sql query from the table environment is the one
> below:
>
> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>
> and I am enableing:
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> configuration.setString("table.exec.mini-batch.size", "5000");
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> and finally
> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>
> I was expecting that the query plan at the WEB UI show to me two hash
> phases as it is present here on the image [1]. Instead, it is showing
> to me the same plan with one hash phase as I was deploying only one
> Local aggregate and one Global aggregate (of course, taking the
> parallel instances into consideration). Please see the query execution
> plan image attached.
>
> Is there something that I am missing when I config the Table API?
> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> Is the "MiniBatch Aggregation" aggregating as a processing time window
> on the operator after the hash phase? If it is, isn't it the same as a
> window aggregation instead of an unbounded window as the example
> presents?
>
> Thanks!
> Felipe
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
Hi Jark,

thanks for your reply. Indeed, I forgot to write DISTINCT on the query
and now the query plan is splitting into two hash partition phases.

what do you mean by deterministic time? Why only the window aggregate
is deterministic? If I implement the ProcessingTimeCallback [1]
interface is it deterministic?

[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:

>
> Hi Felipe,
>
> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>  only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>
> However, the query in your example is using COUNT(driverId).
> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>
> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>
> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>  and processes them together to reduce the state accessing. But processing-time window is still a per-record
>  state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>  of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>   But a window aggregate is triggered by a deterministic time.
>
>
> Best,
> Jark
>
>
> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
>>
>> I realized that I forgot the image. Now it is attached.
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > Hi community,
>> >
>> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > ride data set. My sql query from the table environment is the one
>> > below:
>> >
>> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> >
>> > and I am enableing:
>> > configuration.setString("table.exec.mini-batch.enabled", "true");
>> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > configuration.setString("table.exec.mini-batch.size", "5000");
>> > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > and finally
>> > configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>> >
>> > I was expecting that the query plan at the WEB UI show to me two hash
>> > phases as it is present here on the image [1]. Instead, it is showing
>> > to me the same plan with one hash phase as I was deploying only one
>> > Local aggregate and one Global aggregate (of course, taking the
>> > parallel instances into consideration). Please see the query execution
>> > plan image attached.
>> >
>> > Is there something that I am missing when I config the Table API?
>> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > on the operator after the hash phase? If it is, isn't it the same as a
>> > window aggregation instead of an unbounded window as the example
>> > presents?
>> >
>> > Thanks!
>> > Felipe
>> >
>> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Timo Walther
Hi Felipe,

with non-deterministic Jark meant that you never know if the mini batch
timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
execution. This depends how fast records arrive at the operator.

In general, processing time can be considered non-deterministic, because
100ms must not be 100ms. This depends on the CPU load and other tasks
such garbage collection etc. Only event-time (and thus event time
windows) that work on the timestamp in the data instead of machine time
is determistic,

Regards,
Timo


On 10.11.20 12:02, Felipe Gutierrez wrote:

> Hi Jark,
>
> thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> and now the query plan is splitting into two hash partition phases.
>
> what do you mean by deterministic time? Why only the window aggregate
> is deterministic? If I implement the ProcessingTimeCallback [1]
> interface is it deterministic?
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> Thanks
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:
>>
>> Hi Felipe,
>>
>> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>>
>> However, the query in your example is using COUNT(driverId).
>> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>>
>> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>>
>> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
>>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
>> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>>    But a window aggregate is triggered by a deterministic time.
>>
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
>>>
>>> I realized that I forgot the image. Now it is attached.
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>> -- https://felipeogutierrez.blogspot.com
>>>
>>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>>> <[hidden email]> wrote:
>>>>
>>>> Hi community,
>>>>
>>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>>>> ride data set. My sql query from the table environment is the one
>>>> below:
>>>>
>>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>>>>
>>>> and I am enableing:
>>>> configuration.setString("table.exec.mini-batch.enabled", "true");
>>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>>>> configuration.setString("table.exec.mini-batch.size", "5000");
>>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>>>> and finally
>>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>>>>
>>>> I was expecting that the query plan at the WEB UI show to me two hash
>>>> phases as it is present here on the image [1]. Instead, it is showing
>>>> to me the same plan with one hash phase as I was deploying only one
>>>> Local aggregate and one Global aggregate (of course, taking the
>>>> parallel instances into consideration). Please see the query execution
>>>> plan image attached.
>>>>
>>>> Is there something that I am missing when I config the Table API?
>>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
>>>> on the operator after the hash phase? If it is, isn't it the same as a
>>>> window aggregation instead of an unbounded window as the example
>>>> presents?
>>>>
>>>> Thanks!
>>>> Felipe
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>>> --
>>>> -- Felipe Gutierrez
>>>> -- skype: felipe.o.gutierrez
>>>> -- https://felipeogutierrez.blogspot.com
>

Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
I see, thanks Timo

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote:

>
> Hi Felipe,
>
> with non-deterministic Jark meant that you never know if the mini batch
> timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> execution. This depends how fast records arrive at the operator.
>
> In general, processing time can be considered non-deterministic, because
> 100ms must not be 100ms. This depends on the CPU load and other tasks
> such garbage collection etc. Only event-time (and thus event time
> windows) that work on the timestamp in the data instead of machine time
> is determistic,
>
> Regards,
> Timo
>
>
> On 10.11.20 12:02, Felipe Gutierrez wrote:
> > Hi Jark,
> >
> > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > and now the query plan is splitting into two hash partition phases.
> >
> > what do you mean by deterministic time? Why only the window aggregate
> > is deterministic? If I implement the ProcessingTimeCallback [1]
> > interface is it deterministic?
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > Thanks
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:
> >>
> >> Hi Felipe,
> >>
> >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
> >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> >>
> >> However, the query in your example is using COUNT(driverId).
> >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
> >>
> >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
> >>
> >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
> >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
> >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
> >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
> >>    But a window aggregate is triggered by a deterministic time.
> >>
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
> >>>
> >>> I realized that I forgot the image. Now it is attached.
> >>> --
> >>> -- Felipe Gutierrez
> >>> -- skype: felipe.o.gutierrez
> >>> -- https://felipeogutierrez.blogspot.com
> >>>
> >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> >>> <[hidden email]> wrote:
> >>>>
> >>>> Hi community,
> >>>>
> >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> >>>> ride data set. My sql query from the table environment is the one
> >>>> below:
> >>>>
> >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> >>>>
> >>>> and I am enableing:
> >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> >>>> and finally
> >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
> >>>>
> >>>> I was expecting that the query plan at the WEB UI show to me two hash
> >>>> phases as it is present here on the image [1]. Instead, it is showing
> >>>> to me the same plan with one hash phase as I was deploying only one
> >>>> Local aggregate and one Global aggregate (of course, taking the
> >>>> parallel instances into consideration). Please see the query execution
> >>>> plan image attached.
> >>>>
> >>>> Is there something that I am missing when I config the Table API?
> >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> >>>> on the operator after the hash phase? If it is, isn't it the same as a
> >>>> window aggregation instead of an unbounded window as the example
> >>>> presents?
> >>>>
> >>>> Thanks!
> >>>> Felipe
> >>>>
> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> >>>> --
> >>>> -- Felipe Gutierrez
> >>>> -- skype: felipe.o.gutierrez
> >>>> -- https://felipeogutierrez.blogspot.com
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
Hi Jack,

I don't get the difference from the "MiniBatch Aggregation" if
compared with the "Local-Global Aggregation". On the web page [1] it
says that I have to enable the TWO_PHASE parameter. So I compared the
query plan from both, with and without the TWO_PHASE parameter. And
they are the same. So, I conclude that the mini-batch already is a
TWO_PHASE strategy since it is already pre-aggregating locally. Is it
correct?

Here are both query plans:
Thanks, Felipe

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : false

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : true

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> I see, thanks Timo
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote:
> >
> > Hi Felipe,
> >
> > with non-deterministic Jark meant that you never know if the mini batch
> > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> > execution. This depends how fast records arrive at the operator.
> >
> > In general, processing time can be considered non-deterministic, because
> > 100ms must not be 100ms. This depends on the CPU load and other tasks
> > such garbage collection etc. Only event-time (and thus event time
> > windows) that work on the timestamp in the data instead of machine time
> > is determistic,
> >
> > Regards,
> > Timo
> >
> >
> > On 10.11.20 12:02, Felipe Gutierrez wrote:
> > > Hi Jark,
> > >
> > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > > and now the query plan is splitting into two hash partition phases.
> > >
> > > what do you mean by deterministic time? Why only the window aggregate
> > > is deterministic? If I implement the ProcessingTimeCallback [1]
> > > interface is it deterministic?
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > > Thanks
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:
> > >>
> > >> Hi Felipe,
> > >>
> > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
> > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> > >>
> > >> However, the query in your example is using COUNT(driverId).
> > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
> > >>
> > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
> > >>
> > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
> > >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
> > >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
> > >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
> > >>    But a window aggregate is triggered by a deterministic time.
> > >>
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
> > >>>
> > >>> I realized that I forgot the image. Now it is attached.
> > >>> --
> > >>> -- Felipe Gutierrez
> > >>> -- skype: felipe.o.gutierrez
> > >>> -- https://felipeogutierrez.blogspot.com
> > >>>
> > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> > >>> <[hidden email]> wrote:
> > >>>>
> > >>>> Hi community,
> > >>>>
> > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> > >>>> ride data set. My sql query from the table environment is the one
> > >>>> below:
> > >>>>
> > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> > >>>>
> > >>>> and I am enableing:
> > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> > >>>> and finally
> > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
> > >>>>
> > >>>> I was expecting that the query plan at the WEB UI show to me two hash
> > >>>> phases as it is present here on the image [1]. Instead, it is showing
> > >>>> to me the same plan with one hash phase as I was deploying only one
> > >>>> Local aggregate and one Global aggregate (of course, taking the
> > >>>> parallel instances into consideration). Please see the query execution
> > >>>> plan image attached.
> > >>>>
> > >>>> Is there something that I am missing when I config the Table API?
> > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> > >>>> on the operator after the hash phase? If it is, isn't it the same as a
> > >>>> window aggregation instead of an unbounded window as the example
> > >>>> presents?
> > >>>>
> > >>>> Thanks!
> > >>>> Felipe
> > >>>>
> > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > >>>> --
> > >>>> -- Felipe Gutierrez
> > >>>> -- skype: felipe.o.gutierrez
> > >>>> -- https://felipeogutierrez.blogspot.com
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Jark Wu-3
Hi Felipe,

The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, 
if will use TWO-PHASE, otherwise ONE-PHASE. 


On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <[hidden email]> wrote:
Hi Jack,

I don't get the difference from the "MiniBatch Aggregation" if
compared with the "Local-Global Aggregation". On the web page [1] it
says that I have to enable the TWO_PHASE parameter. So I compared the
query plan from both, with and without the TWO_PHASE parameter. And
they are the same. So, I conclude that the mini-batch already is a
TWO_PHASE strategy since it is already pre-aggregating locally. Is it
correct?

Here are both query plans:
Thanks, Felipe

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : false

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : true

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
<[hidden email]> wrote:
>
> I see, thanks Timo
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote:
> >
> > Hi Felipe,
> >
> > with non-deterministic Jark meant that you never know if the mini batch
> > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> > execution. This depends how fast records arrive at the operator.
> >
> > In general, processing time can be considered non-deterministic, because
> > 100ms must not be 100ms. This depends on the CPU load and other tasks
> > such garbage collection etc. Only event-time (and thus event time
> > windows) that work on the timestamp in the data instead of machine time
> > is determistic,
> >
> > Regards,
> > Timo
> >
> >
> > On 10.11.20 12:02, Felipe Gutierrez wrote:
> > > Hi Jark,
> > >
> > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > > and now the query plan is splitting into two hash partition phases.
> > >
> > > what do you mean by deterministic time? Why only the window aggregate
> > > is deterministic? If I implement the ProcessingTimeCallback [1]
> > > interface is it deterministic?
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > > Thanks
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:
> > >>
> > >> Hi Felipe,
> > >>
> > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
> > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> > >>
> > >> However, the query in your example is using COUNT(driverId).
> > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
> > >>
> > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
> > >>
> > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
> > >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
> > >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
> > >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
> > >>    But a window aggregate is triggered by a deterministic time.
> > >>
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
> > >>>
> > >>> I realized that I forgot the image. Now it is attached.
> > >>> --
> > >>> -- Felipe Gutierrez
> > >>> -- skype: felipe.o.gutierrez
> > >>> -- https://felipeogutierrez.blogspot.com
> > >>>
> > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> > >>> <[hidden email]> wrote:
> > >>>>
> > >>>> Hi community,
> > >>>>
> > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> > >>>> ride data set. My sql query from the table environment is the one
> > >>>> below:
> > >>>>
> > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> > >>>>
> > >>>> and I am enableing:
> > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> > >>>> and finally
> > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
> > >>>>
> > >>>> I was expecting that the query plan at the WEB UI show to me two hash
> > >>>> phases as it is present here on the image [1]. Instead, it is showing
> > >>>> to me the same plan with one hash phase as I was deploying only one
> > >>>> Local aggregate and one Global aggregate (of course, taking the
> > >>>> parallel instances into consideration). Please see the query execution
> > >>>> plan image attached.
> > >>>>
> > >>>> Is there something that I am missing when I config the Table API?
> > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> > >>>> on the operator after the hash phase? If it is, isn't it the same as a
> > >>>> window aggregation instead of an unbounded window as the example
> > >>>> presents?
> > >>>>
> > >>>> Thanks!
> > >>>> Felipe
> > >>>>
> > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > >>>> --
> > >>>> -- Felipe Gutierrez
> > >>>> -- skype: felipe.o.gutierrez
> > >>>> -- https://felipeogutierrez.blogspot.com
> > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: Stream aggregation using Flink Table API (Blink plan)

Felipe Gutierrez
I see. now it has different query plans. It was documented on another
page so I got confused. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, Nov 12, 2020 at 12:41 PM Jark Wu <[hidden email]> wrote:

>
> Hi Felipe,
>
> The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled,
> if will use TWO-PHASE, otherwise ONE-PHASE.
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy
>
> On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi Jack,
>>
>> I don't get the difference from the "MiniBatch Aggregation" if
>> compared with the "Local-Global Aggregation". On the web page [1] it
>> says that I have to enable the TWO_PHASE parameter. So I compared the
>> query plan from both, with and without the TWO_PHASE parameter. And
>> they are the same. So, I conclude that the mini-batch already is a
>> TWO_PHASE strategy since it is already pre-aggregating locally. Is it
>> correct?
>>
>> Here are both query plans:
>> Thanks, Felipe
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : false
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : true
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>>
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
>> <[hidden email]> wrote:
>> >
>> > I see, thanks Timo
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
>> >
>> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <[hidden email]> wrote:
>> > >
>> > > Hi Felipe,
>> > >
>> > > with non-deterministic Jark meant that you never know if the mini batch
>> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
>> > > execution. This depends how fast records arrive at the operator.
>> > >
>> > > In general, processing time can be considered non-deterministic, because
>> > > 100ms must not be 100ms. This depends on the CPU load and other tasks
>> > > such garbage collection etc. Only event-time (and thus event time
>> > > windows) that work on the timestamp in the data instead of machine time
>> > > is determistic,
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > >
>> > > On 10.11.20 12:02, Felipe Gutierrez wrote:
>> > > > Hi Jark,
>> > > >
>> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
>> > > > and now the query plan is splitting into two hash partition phases.
>> > > >
>> > > > what do you mean by deterministic time? Why only the window aggregate
>> > > > is deterministic? If I implement the ProcessingTimeCallback [1]
>> > > > interface is it deterministic?
>> > > >
>> > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
>> > > > Thanks
>> > > >
>> > > > --
>> > > > -- Felipe Gutierrez
>> > > > -- skype: felipe.o.gutierrez
>> > > > -- https://felipeogutierrez.blogspot.com
>> > > >
>> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <[hidden email]> wrote:
>> > > >>
>> > > >> Hi Felipe,
>> > > >>
>> > > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>> > > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>> > > >>
>> > > >> However, the query in your example is using COUNT(driverId).
>> > > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>> > > >>
>> > > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>> > > >>
>> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>> > > >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
>> > > >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>> > > >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
>> > > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>> > > >>    But a window aggregate is triggered by a deterministic time.
>> > > >>
>> > > >>
>> > > >> Best,
>> > > >> Jark
>> > > >>
>> > > >>
>> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <[hidden email]> wrote:
>> > > >>>
>> > > >>> I realized that I forgot the image. Now it is attached.
>> > > >>> --
>> > > >>> -- Felipe Gutierrez
>> > > >>> -- skype: felipe.o.gutierrez
>> > > >>> -- https://felipeogutierrez.blogspot.com
>> > > >>>
>> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> > > >>> <[hidden email]> wrote:
>> > > >>>>
>> > > >>>> Hi community,
>> > > >>>>
>> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > > >>>> ride data set. My sql query from the table environment is the one
>> > > >>>> below:
>> > > >>>>
>> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> > > >>>>
>> > > >>>> and I am enableing:
>> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
>> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
>> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > > >>>> and finally
>> > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>> > > >>>>
>> > > >>>> I was expecting that the query plan at the WEB UI show to me two hash
>> > > >>>> phases as it is present here on the image [1]. Instead, it is showing
>> > > >>>> to me the same plan with one hash phase as I was deploying only one
>> > > >>>> Local aggregate and one Global aggregate (of course, taking the
>> > > >>>> parallel instances into consideration). Please see the query execution
>> > > >>>> plan image attached.
>> > > >>>>
>> > > >>>> Is there something that I am missing when I config the Table API?
>> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > > >>>> on the operator after the hash phase? If it is, isn't it the same as a
>> > > >>>> window aggregation instead of an unbounded window as the example
>> > > >>>> presents?
>> > > >>>>
>> > > >>>> Thanks!
>> > > >>>> Felipe
>> > > >>>>
>> > > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > > >>>> --
>> > > >>>> -- Felipe Gutierrez
>> > > >>>> -- skype: felipe.o.gutierrez
>> > > >>>> -- https://felipeogutierrez.blogspot.com
>> > > >
>> > >