Partitioning key range

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Partitioning key range

Davood Rafiei
Hi all,

I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say ksA) with parallelism 2.
Depending on my keys in dsA, one partition remains empty in ksA.
For example when my keys are 10 and 20 in dsA, then both partitions in ksA are full.
However, with keys 1000 and 1001, only one partition receives all of the upstream data in ksA.
Is there any way to get information about key ranges for each downstream partitions?
Or is there any way to overcome this issue?
We can assume that I know all possible keys (in this case 2 different keys) in dsA and therefore I want all partitions in ksA to be fully utilized.

Thanks,
Davood

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning key range

Congxian Qiu
Hi Davood
Maybe a custom KeySelector can be helpful, you can define the key used to partition the stream. You can ref the code[1] for detail.


Best, Congxian
On Apr 5, 2019, 06:35 +0800, Davood Rafiei <[hidden email]>, wrote:
Hi all,

I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say ksA) with parallelism 2.
Depending on my keys in dsA, one partition remains empty in ksA.
For example when my keys are 10 and 20 in dsA, then both partitions in ksA are full.
However, with keys 1000 and 1001, only one partition receives all of the upstream data in ksA.
Is there any way to get information about key ranges for each downstream partitions?
Or is there any way to overcome this issue?
We can assume that I know all possible keys (in this case 2 different keys) in dsA and therefore I want all partitions in ksA to be fully utilized.

Thanks,
Davood

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning key range

Fabian Hueske-2
Hi Davood,

Flink uses hash partitioning to assign keys to key groups. Each key group is then assigned to a task for processing (a task might process multiple key groups).
There is no way to directly assign a key to a particular key group or task.
All you can do is to experiment with different custom KeySelectors which return keys that are hashed into different key groups.

Best, Fabian

Am Sa., 6. Apr. 2019 um 11:43 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi Davood
Maybe a custom KeySelector can be helpful, you can define the key used to partition the stream. You can ref the code[1] for detail.


Best, Congxian
On Apr 5, 2019, 06:35 +0800, Davood Rafiei <[hidden email]>, wrote:
Hi all,

I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say ksA) with parallelism 2.
Depending on my keys in dsA, one partition remains empty in ksA.
For example when my keys are 10 and 20 in dsA, then both partitions in ksA are full.
However, with keys 1000 and 1001, only one partition receives all of the upstream data in ksA.
Is there any way to get information about key ranges for each downstream partitions?
Or is there any way to overcome this issue?
We can assume that I know all possible keys (in this case 2 different keys) in dsA and therefore I want all partitions in ksA to be fully utilized.

Thanks,
Davood

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning key range

Ken Krugler
Hi Davood,

We have done some explicit partitioning in the past, but it’s pretty fragile.


Though I haven’t tried this with Flink 1.7/1.8, and I’m guessing Fabian would notice some issues if he reviewed it :)

— Ken


On Apr 8, 2019, at 1:01 AM, Fabian Hueske <[hidden email]> wrote:

Hi Davood,

Flink uses hash partitioning to assign keys to key groups. Each key group is then assigned to a task for processing (a task might process multiple key groups).
There is no way to directly assign a key to a particular key group or task.
All you can do is to experiment with different custom KeySelectors which return keys that are hashed into different key groups.

Best, Fabian

Am Sa., 6. Apr. 2019 um 11:43 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi Davood
Maybe a custom KeySelector can be helpful, you can define the key used to partition the stream. You can ref the code[1] for detail.


Best, Congxian
On Apr 5, 2019, 06:35 +0800, Davood Rafiei <[hidden email]>, wrote:
Hi all,

I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say ksA) with parallelism 2.
Depending on my keys in dsA, one partition remains empty in ksA.
For example when my keys are 10 and 20 in dsA, then both partitions in ksA are full.
However, with keys 1000 and 1001, only one partition receives all of the upstream data in ksA.
Is there any way to get information about key ranges for each downstream partitions?
Or is there any way to overcome this issue?
We can assume that I know all possible keys (in this case 2 different keys) in dsA and therefore I want all partitions in ksA to be fully utilized.

Thanks,
Davood


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Partitioning key range

Davood Rafiei
Hi all,

Thanks a lot for the replies!




On Mon, Apr 8, 2019 at 5:15 PM Ken Krugler <[hidden email]> wrote:
Hi Davood,

We have done some explicit partitioning in the past, but it’s pretty fragile.


Though I haven’t tried this with Flink 1.7/1.8, and I’m guessing Fabian would notice some issues if he reviewed it :)

— Ken


On Apr 8, 2019, at 1:01 AM, Fabian Hueske <[hidden email]> wrote:

Hi Davood,

Flink uses hash partitioning to assign keys to key groups. Each key group is then assigned to a task for processing (a task might process multiple key groups).
There is no way to directly assign a key to a particular key group or task.
All you can do is to experiment with different custom KeySelectors which return keys that are hashed into different key groups.

Best, Fabian

Am Sa., 6. Apr. 2019 um 11:43 Uhr schrieb Congxian Qiu <[hidden email]>:
Hi Davood
Maybe a custom KeySelector can be helpful, you can define the key used to partition the stream. You can ref the code[1] for detail.


Best, Congxian
On Apr 5, 2019, 06:35 +0800, Davood Rafiei <[hidden email]>, wrote:
Hi all,

I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say ksA) with parallelism 2.
Depending on my keys in dsA, one partition remains empty in ksA.
For example when my keys are 10 and 20 in dsA, then both partitions in ksA are full.
However, with keys 1000 and 1001, only one partition receives all of the upstream data in ksA.
Is there any way to get information about key ranges for each downstream partitions?
Or is there any way to overcome this issue?
We can assume that I know all possible keys (in this case 2 different keys) in dsA and therefore I want all partitions in ksA to be fully utilized.

Thanks,
Davood


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra