How groupBy work

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How groupBy work

Jinfeng Li
Hi, I find wordcount on Flink is slow and 75% of the time is spent on groupBy operator.  The dataset is 90G, with only 1000 distinct words. Could you tell me how the groupBy is implemented?

Best Regards,
Jeffrey

Reply | Threaded
Open this post in threaded view
|

Re: How groupBy work

Fabian Hueske-2
Hi Jeffery,

Flink uses a (potentially external) merge-sort to group data. Combining is done using an in-memory sort.
Because Flink uses pipelined data transfer, the execution of operators in a program can overlap. For example in WordCount, the sort of a groupBy will immediately start as soon as the first record was read, tokenized and shuffled to the sorter, i.e., the data is not sent as a batch from the tokenizer to the reducer but streamed.

Compared to hash-based aggregations, sort-based aggregations are often less efficient (esp. for low numbers of distinct keys). Flink puts a focus on  execution robustness, which is why it implements internal algorithms (sort, hash-tables) on off-heap memory in a way that they do not fail for larger data sets or higher number of distinct keys (no parameter tuning to get a program working). Since this is more effort than just implementing or using a hash-table that resides on the JVM heap, we haven't added a hash-based combiner yet.

When you say Flink WordCount is slow, which numbers do you compare to?

Best, Fabian


2015-10-30 7:00 GMT+01:00 Jinfeng Li <[hidden email]>:
Hi, I find wordcount on Flink is slow and 75% of the time is spent on groupBy operator.  The dataset is 90G, with only 1000 distinct words. Could you tell me how the groupBy is implemented?

Best Regards,
Jeffrey