How can I find out which key group belongs to which subtask

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How can I find out which key group belongs to which subtask

sanmutongzi
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

Congxian Qiu
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

sanmutongzi
Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

Zhijiang(wangzhijiang999)
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready.

Best,
Zhijiang

------------------------------------------------------------------
From:杨东晓 <[hidden email]>
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

Till Rohrmann
Hi,

you would need to set the co-location constraint in order to ensure that the sub-tasks of operators are deployed to the same machine. It effectively means that subtasks a_i, b_i  of operator a and b will be deployed to the same slot. This feature is not super well exposed but you can take a look at [1] to see how it can be used.


Cheers,
Till

On Fri, Jan 10, 2020 at 9:08 AM Zhijiang <[hidden email]> wrote:
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready.

Best,
Zhijiang

------------------------------------------------------------------
From:杨东晓 <[hidden email]>
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

sanmutongzi
In reply to this post by Zhijiang(wangzhijiang999)
Thanks Zhijiang, looks like serialization will always be there in keyed stream

Zhijiang <[hidden email]> 于2020年1月10日周五 上午12:08写道:
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready.

Best,
Zhijiang

------------------------------------------------------------------
From:杨东晓 <[hidden email]>
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

sanmutongzi
In reply to this post by Till Rohrmann
Thanks Till , I will do some test about this , will this be some public feature in next release version or later?

Till Rohrmann <[hidden email]> 于2020年1月10日周五 上午6:15写道:
Hi,

you would need to set the co-location constraint in order to ensure that the sub-tasks of operators are deployed to the same machine. It effectively means that subtasks a_i, b_i  of operator a and b will be deployed to the same slot. This feature is not super well exposed but you can take a look at [1] to see how it can be used.


Cheers,
Till

On Fri, Jan 10, 2020 at 9:08 AM Zhijiang <[hidden email]> wrote:
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready.

Best,
Zhijiang

------------------------------------------------------------------
From:杨东晓 <[hidden email]>
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: How can I find out which key group belongs to which subtask

Till Rohrmann
This feature won't be more public than it is today.

Cheers,
Till

On Fri, Jan 10, 2020 at 9:51 PM 杨东晓 <[hidden email]> wrote:
Thanks Till , I will do some test about this , will this be some public feature in next release version or later?

Till Rohrmann <[hidden email]> 于2020年1月10日周五 上午6:15写道:
Hi,

you would need to set the co-location constraint in order to ensure that the sub-tasks of operators are deployed to the same machine. It effectively means that subtasks a_i, b_i  of operator a and b will be deployed to the same slot. This feature is not super well exposed but you can take a look at [1] to see how it can be used.


Cheers,
Till

On Fri, Jan 10, 2020 at 9:08 AM Zhijiang <[hidden email]> wrote:
Only chained operators can avoid record serialization cost, but the chaining mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer to support such requirement in release-1.10. You can have a try when the rc candidate is ready.

Best,
Zhijiang

------------------------------------------------------------------
From:杨东晓 <[hidden email]>
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu <[hidden email]>
Cc:user <[hidden email]>
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific subtask which belongs to same taskmanager with upstream record. The key idea is to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred between operator in same taskmanager?
Thanks.

Congxian Qiu <[hidden email]> 于2020年1月9日周四 上午1:55写道:
Hi

If you just want to make sure some key goes into the same subtask, does custom key selector[1] help?

For the keygroup and subtask information, you can ref to KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can ref to doc[3]


杨东晓 <[hidden email]> 于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is there any possible I can find out one key belongs to which key-group and essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from upstream still goes to same taskmanager downstream subtask .Which means even if we use a keyedstream function we still want no cross jvm communication happened during run time.
And if we can achieve that , can we also avoid the expensive cost for record serialization because data is only transferred in same taskmanager jvm instance?

Thanks.