Multiple (non-consecutive) keyBy operators in a dataflow

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple (non-consecutive) keyBy operators in a dataflow

au.fp2018
Hello Flink Community,

I am relatively new to Flink. In the project I am currently working on I've
a dataflow with a keyBy() operator, which I want to convert to dataflow with
multiple keyBy() operators like this:


  Source -->
  KeyBy() -->
  Stateful process() function that generates a more granular key -->
  KeyBy(<id generated in the previous step>) -->
  More stateful computation(s) -->
  Sink

Are there any downsides to this approach?
My reasoning behind the second keyBy() is to reduce the amount of state and
hence improve the processing speed.

Thanks,
Andre




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple (non-consecutive) keyBy operators in a dataflow

李玥
Hello,
In my opinion , it would be meaningful only on this situation:
1. The total size of all your stats is huge enough, e.g. 1GB+.
2. Splitting  you job to multiple KeyBy process would reduce the size of your stats.

Because operation of saving stats is synchronized and all working threads are blocked until the saving stats operation finished.
Our team is trying to make the process of saving stats async, plz refer to : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html

LiYue
http://tig.jd.com
[hidden email]



在 2018年4月3日,上午8:30,au.fp2018 <[hidden email]> 写道:

Hello Flink Community,

I am relatively new to Flink. In the project I am currently working on I've
a dataflow with a keyBy() operator, which I want to convert to dataflow with
multiple keyBy() operators like this:


 Source -->
 KeyBy() -->
 Stateful process() function that generates a more granular key -->
 KeyBy(<id generated in the previous step>) -->
 More stateful computation(s) -->
 Sink

Are there any downsides to this approach?
My reasoning behind the second keyBy() is to reduce the amount of state and
hence improve the processing speed.

Thanks,
Andre




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Multiple (non-consecutive) keyBy operators in a dataflow

Timo Walther
Hi Andre,

every keyBy is a shuffle over the network and thus introduces some overhead. Esp. serialization of records between operators if object reuse is disabled by default. If you think that not all slots (and thus all nodes) are not fully occupied evenly in the first keyBy operation (e.g. if you key space is just 2 values) than it makes sense to have a second keyBy to do the heavy computation on the more granular key to have as much parallelism as possible. It really depends on your job.

I hope this helps.

Regards,
Timo


Am 03.04.18 um 03:22 schrieb 李玥:
Hello,
In my opinion , it would be meaningful only on this situation:
1. The total size of all your stats is huge enough, e.g. 1GB+.
2. Splitting  you job to multiple KeyBy process would reduce the size of your stats.

Because operation of saving stats is synchronized and all working threads are blocked until the saving stats operation finished.
Our team is trying to make the process of saving stats async, plz refer to : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html

LiYue
http://tig.jd.com
[hidden email]



在 2018年4月3日,上午8:30,au.fp2018 <[hidden email]> 写道:

Hello Flink Community,

I am relatively new to Flink. In the project I am currently working on I've
a dataflow with a keyBy() operator, which I want to convert to dataflow with
multiple keyBy() operators like this:


 Source -->
 KeyBy() -->
 Stateful process() function that generates a more granular key -->
 KeyBy(<id generated in the previous step>) -->
 More stateful computation(s) -->
 Sink

Are there any downsides to this approach?
My reasoning behind the second keyBy() is to reduce the amount of state and
hence improve the processing speed.

Thanks,
Andre




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Multiple (non-consecutive) keyBy operators in a dataflow

au.fp2018
Thanks Timo/LiYue, your responses were helpful.

I was worried about the network shuffle with the second keyBy. The first
keyBy is indeed evenly spreading the load across the nodes. As I mentioned
my concern was around the amount of state in each key. Maybe I am trying to
optimize pre-maturely here.

My follow-up question is: How much state per key is considered big thus
causing performance overheads? If I am within this limit after the first
keyBy I wouldn't need the second keyBy and thus prevent the network shuffle.

Thanks,
Arun


Timo Walther wrote

> Hi Andre,
>
> every keyBy is a shuffle over the network and thus introduces some
> overhead. Esp. serialization of records between operators if object
> reuse is disabled by default. If you think that not all slots (and thus
> all nodes) are not fully occupied evenly in the first keyBy operation
> (e.g. if you key space is just 2 values) than it makes sense to have a
> second keyBy to do the heavy computation on the more granular key to
> have as much parallelism as possible. It really depends on your job.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> Am 03.04.18 um 03:22 schrieb 李玥:
>> Hello,
>> In my opinion , it would be meaningful only on this situation:
>> 1. The total size of all your stats is huge enough, e.g. 1GB+.
>> 2. Splitting  you job to multiple KeyBy process would reduce the size
>> of your stats.
>>
>> Because operation of saving stats is synchronized and all working
>> threads are blocked until the saving stats operation finished.
>> Our team is trying to make the process of saving stats async, plz
>> refer to :
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>>
>> LiYue
>> http://tig.jd.com
>>

> liyue2008@

>>
>>
>>
>>> 在 2018年4月3日,上午8:30,au.fp2018 &lt;

> au.fp2018@

>  
> &gt;> &lt;mailto:

> au.fp2018@

> &gt;> 写道:
>>>
>>> Hello Flink Community,
>>>
>>> I am relatively new to Flink. In the project I am currently working
>>> on I've
>>> a dataflow with a keyBy() operator, which I want to convert to
>>> dataflow with
>>> multiple keyBy() operators like this:
>>>
>>>
>>>  Source -->
>>>  KeyBy() -->
>>>  Stateful process() function that generates a more granular key -->
>>>  KeyBy(
> <id generated in the previous step>
> ) -->
>>>  More stateful computation(s) -->
>>>  Sink
>>>
>>> Are there any downsides to this approach?
>>> My reasoning behind the second keyBy() is to reduce the amount of
>>> state and
>>> hence improve the processing speed.
>>>
>>> Thanks,
>>> Andre
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Multiple (non-consecutive) keyBy operators in a dataflow

Timo Walther
@Richter: Are you aware of any per-key state size performance implications?


Am 03.04.18 um 16:56 schrieb au.fp2018:

> Thanks Timo/LiYue, your responses were helpful.
>
> I was worried about the network shuffle with the second keyBy. The first
> keyBy is indeed evenly spreading the load across the nodes. As I mentioned
> my concern was around the amount of state in each key. Maybe I am trying to
> optimize pre-maturely here.
>
> My follow-up question is: How much state per key is considered big thus
> causing performance overheads? If I am within this limit after the first
> keyBy I wouldn't need the second keyBy and thus prevent the network shuffle.
>
> Thanks,
> Arun
>
>
> Timo Walther wrote
>> Hi Andre,
>>
>> every keyBy is a shuffle over the network and thus introduces some
>> overhead. Esp. serialization of records between operators if object
>> reuse is disabled by default. If you think that not all slots (and thus
>> all nodes) are not fully occupied evenly in the first keyBy operation
>> (e.g. if you key space is just 2 values) than it makes sense to have a
>> second keyBy to do the heavy computation on the more granular key to
>> have as much parallelism as possible. It really depends on your job.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> Am 03.04.18 um 03:22 schrieb 李玥:
>>> Hello,
>>> In my opinion , it would be meaningful only on this situation:
>>> 1. The total size of all your stats is huge enough, e.g. 1GB+.
>>> 2. Splitting  you job to multiple KeyBy process would reduce the size
>>> of your stats.
>>>
>>> Because operation of saving stats is synchronized and all working
>>> threads are blocked until the saving stats operation finished.
>>> Our team is trying to make the process of saving stats async, plz
>>> refer to :
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>>>
>>> LiYue
>>> http://tig.jd.com
>>>
>> liyue2008@
>>>
>>>
>>>> 在 2018年4月3日,上午8:30,au.fp2018 &lt;
>> au.fp2018@
>>  
>> &gt;> &lt;mailto:
>> au.fp2018@
>> &gt;> 写道:
>>>> Hello Flink Community,
>>>>
>>>> I am relatively new to Flink. In the project I am currently working
>>>> on I've
>>>> a dataflow with a keyBy() operator, which I want to convert to
>>>> dataflow with
>>>> multiple keyBy() operators like this:
>>>>
>>>>
>>>>   Source -->
>>>>   KeyBy() -->
>>>>   Stateful process() function that generates a more granular key -->
>>>>   KeyBy(
>> <id generated in the previous step>
>> ) -->
>>>>   More stateful computation(s) -->
>>>>   Sink
>>>>
>>>> Are there any downsides to this approach?
>>>> My reasoning behind the second keyBy() is to reduce the amount of
>>>> state and
>>>> hence improve the processing speed.
>>>>
>>>> Thanks,
>>>> Andre
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Multiple (non-consecutive) keyBy operators in a dataflow

Stefan Richter-2
I don’t think there are any particular implications. I would suggest to go for a simple keyBy and think about optimization if there should actually be a problem at hand.

Best,
Stefan

> Am 03.04.2018 um 17:08 schrieb Timo Walther <[hidden email]>:
>
> @Richter: Are you aware of any per-key state size performance implications?
>
>
> Am 03.04.18 um 16:56 schrieb au.fp2018:
>> Thanks Timo/LiYue, your responses were helpful.
>>
>> I was worried about the network shuffle with the second keyBy. The first
>> keyBy is indeed evenly spreading the load across the nodes. As I mentioned
>> my concern was around the amount of state in each key. Maybe I am trying to
>> optimize pre-maturely here.
>>
>> My follow-up question is: How much state per key is considered big thus
>> causing performance overheads? If I am within this limit after the first
>> keyBy I wouldn't need the second keyBy and thus prevent the network shuffle.
>>
>> Thanks,
>> Arun
>>
>>
>> Timo Walther wrote
>>> Hi Andre,
>>>
>>> every keyBy is a shuffle over the network and thus introduces some
>>> overhead. Esp. serialization of records between operators if object
>>> reuse is disabled by default. If you think that not all slots (and thus
>>> all nodes) are not fully occupied evenly in the first keyBy operation
>>> (e.g. if you key space is just 2 values) than it makes sense to have a
>>> second keyBy to do the heavy computation on the more granular key to
>>> have as much parallelism as possible. It really depends on your job.
>>>
>>> I hope this helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 03.04.18 um 03:22 schrieb 李玥:
>>>> Hello,
>>>> In my opinion , it would be meaningful only on this situation:
>>>> 1. The total size of all your stats is huge enough, e.g. 1GB+.
>>>> 2. Splitting  you job to multiple KeyBy process would reduce the size
>>>> of your stats.
>>>>
>>>> Because operation of saving stats is synchronized and all working
>>>> threads are blocked until the saving stats operation finished.
>>>> Our team is trying to make the process of saving stats async, plz
>>>> refer to :
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>>>>
>>>> LiYue
>>>> http://tig.jd.com
>>>>
>>> liyue2008@
>>>>
>>>>
>>>>> 在 2018年4月3日,上午8:30,au.fp2018 &lt;
>>> au.fp2018@
>>>  &gt;> &lt;mailto:
>>> au.fp2018@
>>> &gt;> 写道:
>>>>> Hello Flink Community,
>>>>>
>>>>> I am relatively new to Flink. In the project I am currently working
>>>>> on I've
>>>>> a dataflow with a keyBy() operator, which I want to convert to
>>>>> dataflow with
>>>>> multiple keyBy() operators like this:
>>>>>
>>>>>
>>>>>  Source -->
>>>>>  KeyBy() -->
>>>>>  Stateful process() function that generates a more granular key -->
>>>>>  KeyBy(
>>> <id generated in the previous step>
>>> ) -->
>>>>>  More stateful computation(s) -->
>>>>>  Sink
>>>>>
>>>>> Are there any downsides to this approach?
>>>>> My reasoning behind the second keyBy() is to reduce the amount of
>>>>> state and
>>>>> hence improve the processing speed.
>>>>>
>>>>> Thanks,
>>>>> Andre
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>