Keys distribution insights

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Keys distribution insights

Flavio Pompermaier
Hi everybody,
in my job I have a groupReduce operator with parallelism 4 and one of the sub-tasks takes a huge amount of time (wrt the others).
My guess is that the objects assigned to that slot have much more data to reduce (an thus are somehow computationally heavy within the groupReduce operator). 
What I'm trying to understand which keys are assigned to that slot: is there any way (from the JobManager UI or from the logs) to investigate the keys distribution (that from the plan visualization is the result of an hash partition)?

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: Keys distribution insights

Aljoscha Krettek
Hi,

There is no way of doing it with any Flink UI but you could try and do it manually: in your job, instead of doing the actual computation just count how many elements you have per key (in your GroupReduce). Then put a MapPartition right after the GroupReduce (which should preserve the same partitioning) and inside that see what keys you have and how many elements you had per key. With this you know which partition, i.e. which parallel instance had which keys and how many they were.

Best,
Aljoscha

> On 5. Jun 2017, at 12:01, Flavio Pompermaier <[hidden email]> wrote:
>
> Hi everybody,
> in my job I have a groupReduce operator with parallelism 4 and one of the sub-tasks takes a huge amount of time (wrt the others).
> My guess is that the objects assigned to that slot have much more data to reduce (an thus are somehow computationally heavy within the groupReduce operator).
> What I'm trying to understand which keys are assigned to that slot: is there any way (from the JobManager UI or from the logs) to investigate the keys distribution (that from the plan visualization is the result of an hash partition)?
>
> Best,
> Flavio

Reply | Threaded
Open this post in threaded view
|

Re: Keys distribution insights

Flavio Pompermaier
Thanks Aljoscha. As I was suspecting, currently there's no unobtrusive way for that, but I can live with it..

Best,
Flavio

On Tue, Jun 6, 2017 at 3:48 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

There is no way of doing it with any Flink UI but you could try and do it manually: in your job, instead of doing the actual computation just count how many elements you have per key (in your GroupReduce). Then put a MapPartition right after the GroupReduce (which should preserve the same partitioning) and inside that see what keys you have and how many elements you had per key. With this you know which partition, i.e. which parallel instance had which keys and how many they were.

Best,
Aljoscha

> On 5. Jun 2017, at 12:01, Flavio Pompermaier <[hidden email]> wrote:
>
> Hi everybody,
> in my job I have a groupReduce operator with parallelism 4 and one of the sub-tasks takes a huge amount of time (wrt the others).
> My guess is that the objects assigned to that slot have much more data to reduce (an thus are somehow computationally heavy within the groupReduce operator).
> What I'm trying to understand which keys are assigned to that slot: is there any way (from the JobManager UI or from the logs) to investigate the keys distribution (that from the plan visualization is the result of an hash partition)?
>
> Best,
> Flavio