Manual allocation of slot usage

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Manual allocation of slot usage

Mu Kong
Hi community,

I'm running an application to consume data from kafka, and process it then put data to the druid.
I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.

So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
What I want is flink cluster will put 4 kafka source subtasks on each task manager.

Is that possible? I have gone through the document, the only thing we found is 
cluster.evenly-spread-out-slots
which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.

So, is some kind of manual resource allocation available?
Thanks in advance!


Best regards,
Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Yangze Guo
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:

>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Mu Kong
Hi, Guo,

Thanks for helping out.

My application has a kafka source with 60 subtasks(parallelism), and we have 15 task managers with 15 slots on each.

Before I applied the cluster.evenly-spread-out-slots, meaning it is set to default false, the operator 'kafka source" has 11 subtasks allocated in one single task manager, 
while the remaining 49 subtasks of "kafka source" distributed to the remaining 14 task managers.

After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of "kafka source" were allocated to only 4 task managers, and they took 15 slots on each of these 4 TMs.

What I thought is that this config will make the subtasks of one operator more evenly spread among the task managers, but it seems it made them allocated in the same task manager as much as possible.

The version I'm deploying is 1.9.0.

Best regards,
Mu

On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo <[hidden email]> wrote:
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:
>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Yangze Guo
Hi, Mu,

AFAIK, this feature is added to 1.9.2. If you use 1.9.0, would you
like to upgrade your Flink distribution?

Best,
Yangze Guo

On Tue, Jul 7, 2020 at 8:33 PM Mu Kong <[hidden email]> wrote:

>
> Hi, Guo,
>
> Thanks for helping out.
>
> My application has a kafka source with 60 subtasks(parallelism), and we have 15 task managers with 15 slots on each.
>
> Before I applied the cluster.evenly-spread-out-slots, meaning it is set to default false, the operator 'kafka source" has 11 subtasks allocated in one single task manager,
> while the remaining 49 subtasks of "kafka source" distributed to the remaining 14 task managers.
>
> After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of "kafka source" were allocated to only 4 task managers, and they took 15 slots on each of these 4 TMs.
>
> What I thought is that this config will make the subtasks of one operator more evenly spread among the task managers, but it seems it made them allocated in the same task manager as much as possible.
>
> The version I'm deploying is 1.9.0.
>
> Best regards,
> Mu
>
> On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo <[hidden email]> wrote:
>>
>> Hi, Mu,
>>
>> IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
>> do you think it does the opposite of what you want. Do you run your
>> job in active mode? If so, cluster.evenly-spread-out-slots might not
>> work very well because there could be insufficient task managers when
>> request slot from ResourceManager. This has been discussed in
>> https://issues.apache.org/jira/browse/FLINK-12122 .
>>
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:
>> >
>> > Hi community,
>> >
>> > I'm running an application to consume data from kafka, and process it then put data to the druid.
>> > I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>> >
>> > So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
>> > What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>> >
>> > Is that possible? I have gone through the document, the only thing we found is
>> >
>> > cluster.evenly-spread-out-slots
>> >
>> > which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>> >
>> > So, is some kind of manual resource allocation available?
>> > Thanks in advance!
>> >
>> >
>> > Best regards,
>> > Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Xintong Song
In reply to this post by Mu Kong
Hi Mu,
Regarding your questions.
  • The feature `spread out tasks evenly across task managers` is introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA ticket [1]. That means if you configure this option in Flink 1.9.0, it should not take any effect.
  • Please be aware that this feature ATM only works for standalone deployment (including standalone Kubernetes deployment). For the native Kubernetes, Yarn and Mesos deployment, it is a known issue that this feature does not work as expected.
  • Regarding the scheduling behavior changes, we would need more information to explain this. To provide the information needed, the easiest way is probably to provide the jobmanager log files, if you're okay with sharing them. If you cannot share the logs, then it would be better to answer the following questions
    • What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
    • How many times have you tried with and without `cluster.evenly-spread-out-slots`? In other words, the described behaviors before and after setting `cluster.evenly-spread-out-slots`, can they be stably reproduced?
    • How many TMs do you have? And how many slots does each TM has?

Thank you~

Xintong Song



On Tue, Jul 7, 2020 at 8:33 PM Mu Kong <[hidden email]> wrote:
Hi, Guo,

Thanks for helping out.

My application has a kafka source with 60 subtasks(parallelism), and we have 15 task managers with 15 slots on each.

Before I applied the cluster.evenly-spread-out-slots, meaning it is set to default false, the operator 'kafka source" has 11 subtasks allocated in one single task manager, 
while the remaining 49 subtasks of "kafka source" distributed to the remaining 14 task managers.

After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of "kafka source" were allocated to only 4 task managers, and they took 15 slots on each of these 4 TMs.

What I thought is that this config will make the subtasks of one operator more evenly spread among the task managers, but it seems it made them allocated in the same task manager as much as possible.

The version I'm deploying is 1.9.0.

Best regards,
Mu

On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo <[hidden email]> wrote:
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:
>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Mu Kong
Hi Song, Guo,

Thanks for the information.
I will first upgrade our flink cluster to 1.10.0 and try again.
Currently, we are encountering some dependency conflict issue, possibly with tranquility. But that is another issue.

For your information, (also as I described in the previous email)
What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos): we are running a standalone cluster with version 1.9.0.
How many times have you tried with and without `cluster.evenly-spread-out-slots`? Almost all the time. This is the first time we tried it. The behavior before we changed the config, is that a great amount of subtasks of the source (11 subtasks) were allocated in one task manager, and the rest of the subtasks for that source we were spread unevenly to all rest task managers. After changing the configuration, the subtasks of this source took all the slots on 4 of our task managers, which was more "skewed" than before.
How many TMs do you have? And how many slots does each TM has? We have 15 task manager with 15 slots on each.

I will try to reproduce this tomorrow(JST) when I have time.

Best regards,
Mu

On Wed, Jul 8, 2020 at 11:01 AM Xintong Song <[hidden email]> wrote:
Hi Mu,
Regarding your questions.
  • The feature `spread out tasks evenly across task managers` is introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA ticket [1]. That means if you configure this option in Flink 1.9.0, it should not take any effect.
  • Please be aware that this feature ATM only works for standalone deployment (including standalone Kubernetes deployment). For the native Kubernetes, Yarn and Mesos deployment, it is a known issue that this feature does not work as expected.
  • Regarding the scheduling behavior changes, we would need more information to explain this. To provide the information needed, the easiest way is probably to provide the jobmanager log files, if you're okay with sharing them. If you cannot share the logs, then it would be better to answer the following questions
    • What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
    • How many times have you tried with and without `cluster.evenly-spread-out-slots`? In other words, the described behaviors before and after setting `cluster.evenly-spread-out-slots`, can they be stably reproduced?
    • How many TMs do you have? And how many slots does each TM has?

Thank you~

Xintong Song



On Tue, Jul 7, 2020 at 8:33 PM Mu Kong <[hidden email]> wrote:
Hi, Guo,

Thanks for helping out.

My application has a kafka source with 60 subtasks(parallelism), and we have 15 task managers with 15 slots on each.

Before I applied the cluster.evenly-spread-out-slots, meaning it is set to default false, the operator 'kafka source" has 11 subtasks allocated in one single task manager, 
while the remaining 49 subtasks of "kafka source" distributed to the remaining 14 task managers.

After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of "kafka source" were allocated to only 4 task managers, and they took 15 slots on each of these 4 TMs.

What I thought is that this config will make the subtasks of one operator more evenly spread among the task managers, but it seems it made them allocated in the same task manager as much as possible.

The version I'm deploying is 1.9.0.

Best regards,
Mu

On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo <[hidden email]> wrote:
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:
>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu
Reply | Threaded
Open this post in threaded view
|

Re: Manual allocation of slot usage

Mu Kong
Hi, Song, Guo,

We updated our cluster to 1.10.1 and the cluster.evenly-spread-out-slots works pretty well now.
Thanks for your help!

Best regards,
Mu

On Wed, Jul 8, 2020 at 9:35 PM Mu Kong <[hidden email]> wrote:
Hi Song, Guo,

Thanks for the information.
I will first upgrade our flink cluster to 1.10.0 and try again.
Currently, we are encountering some dependency conflict issue, possibly with tranquility. But that is another issue.

For your information, (also as I described in the previous email)
What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos): we are running a standalone cluster with version 1.9.0.
How many times have you tried with and without `cluster.evenly-spread-out-slots`? Almost all the time. This is the first time we tried it. The behavior before we changed the config, is that a great amount of subtasks of the source (11 subtasks) were allocated in one task manager, and the rest of the subtasks for that source we were spread unevenly to all rest task managers. After changing the configuration, the subtasks of this source took all the slots on 4 of our task managers, which was more "skewed" than before.
How many TMs do you have? And how many slots does each TM has? We have 15 task manager with 15 slots on each.

I will try to reproduce this tomorrow(JST) when I have time.

Best regards,
Mu

On Wed, Jul 8, 2020 at 11:01 AM Xintong Song <[hidden email]> wrote:
Hi Mu,
Regarding your questions.
  • The feature `spread out tasks evenly across task managers` is introduced in Flink 1.10.0, and backported to Flink 1.9.2, per the JIRA ticket [1]. That means if you configure this option in Flink 1.9.0, it should not take any effect.
  • Please be aware that this feature ATM only works for standalone deployment (including standalone Kubernetes deployment). For the native Kubernetes, Yarn and Mesos deployment, it is a known issue that this feature does not work as expected.
  • Regarding the scheduling behavior changes, we would need more information to explain this. To provide the information needed, the easiest way is probably to provide the jobmanager log files, if you're okay with sharing them. If you cannot share the logs, then it would be better to answer the following questions
    • What Flink deployment are you using? (Standalone/K8s/Yarn/Mesos)
    • How many times have you tried with and without `cluster.evenly-spread-out-slots`? In other words, the described behaviors before and after setting `cluster.evenly-spread-out-slots`, can they be stably reproduced?
    • How many TMs do you have? And how many slots does each TM has?

Thank you~

Xintong Song



On Tue, Jul 7, 2020 at 8:33 PM Mu Kong <[hidden email]> wrote:
Hi, Guo,

Thanks for helping out.

My application has a kafka source with 60 subtasks(parallelism), and we have 15 task managers with 15 slots on each.

Before I applied the cluster.evenly-spread-out-slots, meaning it is set to default false, the operator 'kafka source" has 11 subtasks allocated in one single task manager, 
while the remaining 49 subtasks of "kafka source" distributed to the remaining 14 task managers.

After I set cluster.evenly-spread-out-slots to true, the 60 subtasks of "kafka source" were allocated to only 4 task managers, and they took 15 slots on each of these 4 TMs.

What I thought is that this config will make the subtasks of one operator more evenly spread among the task managers, but it seems it made them allocated in the same task manager as much as possible.

The version I'm deploying is 1.9.0.

Best regards,
Mu

On Tue, Jul 7, 2020 at 7:10 PM Yangze Guo <[hidden email]> wrote:
Hi, Mu,

IIUC, cluster.evenly-spread-out-slots would fulfill your demand. Why
do you think it does the opposite of what you want. Do you run your
job in active mode? If so, cluster.evenly-spread-out-slots might not
work very well because there could be insufficient task managers when
request slot from ResourceManager. This has been discussed in
https://issues.apache.org/jira/browse/FLINK-12122 .


Best,
Yangze Guo

On Tue, Jul 7, 2020 at 5:44 PM Mu Kong <[hidden email]> wrote:
>
> Hi community,
>
> I'm running an application to consume data from kafka, and process it then put data to the druid.
> I wonder if there is a way where I can allocate the data source consuming process evenly across the task manager to maximize the usage of the network of task managers.
>
> So, for example, I have 15 task managers and I set parallelism for the kafka source as 60, since I have 60 partitions in kafka topic.
> What I want is flink cluster will put 4 kafka source subtasks on each task manager.
>
> Is that possible? I have gone through the document, the only thing we found is
>
> cluster.evenly-spread-out-slots
>
> which does exact the opposite of what I want. It will put the subtasks of the same operator onto one task manager as much as possible.
>
> So, is some kind of manual resource allocation available?
> Thanks in advance!
>
>
> Best regards,
> Mu