Help on the Split Distinct Aggregation from Table API

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Help on the Split Distinct Aggregation from Table API

Felipe Gutierrez
Hi,

I am trying to understand and simulate the "Split Distinct
Aggregation" [1] from Table API. I am executing the query:

SELECT driverId, COUNT(DISTINCT dayOfTheYear) FROM TaxiRide GROUP BY driverId

on the TaxiRide data from Flink exercises. As it is mentioned in the
link [1], the optimization of "Split Distinct Aggregation" reveals
good performance when there is sparse data on the column of the
distinct, which in my case is the "driverId". By sparse I understand
that the line that the query is processing, but the value on
"driverId" is null or 0. Am I correct?

So I created a second data source file in which only 10% of the rows
have a valid "driverId". The others are filled with the value 0. I
have 8 parallel data sources generating data at 25K r/s (total -> 200K
rec/sec). This is the data rate that I was getting backpressure when I
was executing count/sum/avg with Table API and then I used the
mini-batch and 2-phases to decrease the backpressure.

I was expecting that the query without any optimization
(mini-batch/2-phases) get high backpressure. Then as I change to
mini-batch, then to 2-phases I could see some optimization but still
with backpressure. Then when I change to split optimization I get low
backpressure.

Is there something wrong with my query or my data?
Thanks,
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: Help on the Split Distinct Aggregation from Table API

Felipe Gutierrez
I just realized that i have to use the dayOfTheYear on the gropuBy. I will test again.


On Thu, 10 Dec 2020, 18:48 Felipe Gutierrez, <[hidden email]> wrote:
Hi,

I am trying to understand and simulate the "Split Distinct
Aggregation" [1] from Table API. I am executing the query:

SELECT driverId, COUNT(DISTINCT dayOfTheYear) FROM TaxiRide GROUP BY driverId

on the TaxiRide data from Flink exercises. As it is mentioned in the
link [1], the optimization of "Split Distinct Aggregation" reveals
good performance when there is sparse data on the column of the
distinct, which in my case is the "driverId". By sparse I understand
that the line that the query is processing, but the value on
"driverId" is null or 0. Am I correct?

So I created a second data source file in which only 10% of the rows
have a valid "driverId". The others are filled with the value 0. I
have 8 parallel data sources generating data at 25K r/s (total -> 200K
rec/sec). This is the data rate that I was getting backpressure when I
was executing count/sum/avg with Table API and then I used the
mini-batch and 2-phases to decrease the backpressure.

I was expecting that the query without any optimization
(mini-batch/2-phases) get high backpressure. Then as I change to
mini-batch, then to 2-phases I could see some optimization but still
with backpressure. Then when I change to split optimization I get low
backpressure.

Is there something wrong with my query or my data?
Thanks,
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com