Scheduling of GroupByKey and CombinePerKey operations

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Scheduling of GroupByKey and CombinePerKey operations

Pawel Bartoszek
Can I ask why some operations run only one slot? I understand that file writes should happen only one one slot but GroupByKey operation could be distributed across all slots. I am having around 20k distinct keys every minute. Is there any way to break this operator chain? 
 
I noticed that CombinePerKey operations that don't have IO related transformation are scheduled across all 32 slots.


My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink 1.3.2

2018-01-18, 13:56:282018-01-18, 14:37:1440m 45sGroupByKey -> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) -> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out -> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem149 MB333,67270.8 MB1932
00320000
RUNNING
Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords sentAttemptHostStatus
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s77.5 MB333,6832.21 MB201xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING

Thanks,
Pawel
Reply | Threaded
Open this post in threaded view
|

Re: Scheduling of GroupByKey and CombinePerKey operations

Fabian Hueske-2
Hi Pawel,

This question might be better suited for the Beam user list.
Beam includes the Beam Flink runner which translates Beam programs into Flink programs.

Best,
Fabian

2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <[hidden email]>:
Can I ask why some operations run only one slot? I understand that file writes should happen only one one slot but GroupByKey operation could be distributed across all slots. I am having around 20k distinct keys every minute. Is there any way to break this operator chain? 
 
I noticed that CombinePerKey operations that don't have IO related transformation are scheduled across all 32 slots.


My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink 1.3.2

2018-01-18, 13:56:282018-01-18, 14:37:1440m 45sGroupByKey -> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) -> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out -> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem149 MB333,67270.8 MB1932
00320000
RUNNING
Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords sentAttemptHostStatus
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s77.5 MB333,6832.21 MB201xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING

Thanks,
Pawel

Reply | Threaded
Open this post in threaded view
|

Re: Scheduling of GroupByKey and CombinePerKey operations

Aljoscha Krettek
Hi,

What are the other stages in that program?

Best,
Aljoscha

On 18. Jan 2018, at 16:22, Fabian Hueske <[hidden email]> wrote:

Hi Pawel,

This question might be better suited for the Beam user list.
Beam includes the Beam Flink runner which translates Beam programs into Flink programs.

Best,
Fabian

2018-01-18 16:02 GMT+01:00 Pawel Bartoszek <[hidden email]>:
Can I ask why some operations run only one slot? I understand that file writes should happen only one one slot but GroupByKey operation could be distributed across all slots. I am having around 20k distinct keys every minute. Is there any way to break this operator chain? 
 
I noticed that CombinePerKey operations that don't have IO related transformation are scheduled across all 32 slots.


My cluster has 32 slots across 2 task managers. Running Beam 2.2. and Flink 1.3.2

2018-01-18, 13:56:282018-01-18, 14:37:1440m 45sGroupByKey -> ParMultiDo(WriteShardedBundles) -> ParMultiDo(Anonymous) -> xxx.pipeline.output.io.file.WriteWindowToFile-SumPlaybackBitrateResult2/TextIO.Write/WriteFiles/Reshuffle/Window.Into()/Window.Assign.out -> ParMultiDo(ReifyValueTimestamp) -> ToKeyedWorkItem149 MB333,67270.8 MB1932
00320000
RUNNING
Start TimeEnd TimeDurationBytes receivedRecords receivedBytes sentRecords sentAttemptHostStatus
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s77.5 MB333,6832.21 MB201xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING
2018-01-18, 13:56:2840m 45s2.30 MB02.21 MB01xxxRUNNING

Thanks,
Pawel