Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final implementation has some differences compared with the initial proposal in the FLIP document. To avoid potential misunderstandings, I have updated the FLIP document[1] accordingly and I also drafted another document[2] which contains more implementation details.  FYI.


Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道:
Hi devs,

Currently, Flink adopts a hash-style blocking shuffle implementation which writes data sent to different reducer tasks into separate files concurrently. Compared to sort-merge based approach writes those data together into a single file and merges those small files into bigger ones, hash-based approach has several weak points when it comes to running large scale batch jobs:
  1. Stability: For high parallelism (tens of thousands) batch job, current hash-based blocking shuffle implementation writes too many files concurrently which gives high pressure to the file system, for example, maintenance of too many file metas, exhaustion of inodes or file descriptors. All of these can be potential stability issues. Sort-Merge based blocking shuffle don’t have the problem because for one result partition, only one file is written at the same time.
  2. Performance: Large amounts of small shuffle files and random IO can influence shuffle performance a lot especially for hdd (for ssd, sequential read is also important because of read ahead and cache). For batch jobs processing massive data, small amount of data per subpartition is common because of high parallelism. Besides, data skew is another cause of small subpartition files. By merging data of all subpartitions together in one file, more sequential read can be achieved.
  3. Resource: For current hash-based implementation, each subpartition needs at least one buffer. For large scale batch shuffles, the memory consumption can be huge. For example, we need at least 320M network memory per result partition if parallelism is set to 10000 and because of the huge network consumption, it is hard to config the network memory for large scale batch job and  sometimes parallelism can not be increased just because of insufficient network memory  which leads to bad user experience.
To improve Flink’s capability of running large scale batch jobs, we would like to introduce sort-merge based blocking shuffle to Flink[1]. Any feedback is appreciated.


Best,
Yingjie
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Jingsong Li
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> wrote:
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final implementation has some differences compared with the initial proposal in the FLIP document. To avoid potential misunderstandings, I have updated the FLIP document[1] accordingly and I also drafted another document[2] which contains more implementation details.  FYI.


Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道:
Hi devs,

Currently, Flink adopts a hash-style blocking shuffle implementation which writes data sent to different reducer tasks into separate files concurrently. Compared to sort-merge based approach writes those data together into a single file and merges those small files into bigger ones, hash-based approach has several weak points when it comes to running large scale batch jobs:
  1. Stability: For high parallelism (tens of thousands) batch job, current hash-based blocking shuffle implementation writes too many files concurrently which gives high pressure to the file system, for example, maintenance of too many file metas, exhaustion of inodes or file descriptors. All of these can be potential stability issues. Sort-Merge based blocking shuffle don’t have the problem because for one result partition, only one file is written at the same time.
  2. Performance: Large amounts of small shuffle files and random IO can influence shuffle performance a lot especially for hdd (for ssd, sequential read is also important because of read ahead and cache). For batch jobs processing massive data, small amount of data per subpartition is common because of high parallelism. Besides, data skew is another cause of small subpartition files. By merging data of all subpartitions together in one file, more sequential read can be achieved.
  3. Resource: For current hash-based implementation, each subpartition needs at least one buffer. For large scale batch shuffles, the memory consumption can be huge. For example, we need at least 320M network memory per result partition if parallelism is set to 10000 and because of the huge network consumption, it is hard to config the network memory for large scale batch job and  sometimes parallelism can not be increased just because of insufficient network memory  which leads to bad user experience.
To improve Flink’s capability of running large scale batch jobs, we would like to introduce sort-merge based blocking shuffle to Flink[1]. Any feedback is appreciated.


Best,
Yingjie


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Till Rohrmann
Thanks for the update Yingjie. Would it make sense to write a short blog post about this feature including some performance improvement numbers? I think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> wrote:
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann <[hidden email]> 于2021年6月8日周二 下午5:30写道:
Thanks for the update Yingjie. Would it make sense to write a short blog post about this feature including some performance improvement numbers? I think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> wrote:
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

Till Rohrmann
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao <[hidden email]> wrote:
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann <[hidden email]> 于2021年6月8日周二 下午5:30写道:
Thanks for the update Yingjie. Would it make sense to write a short blog post about this feature including some performance improvement numbers? I think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <[hidden email]> wrote:
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <[hidden email]> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <[hidden email]> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

--
Best, Jingsong Lee