Distributing Tasks over Task manager

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Distributing Tasks over Task manager

Jürgen Thomann
Hi,

we currently have an issue with Flink, as it allocates many tasks to the
same task manager and as a result it overloads it. I reduced the amount
of task slots per task manager (keeping the CPU count) and added some
more servers but that did not help to distribute the load.

Is there some way to force Flink to distribute the load/tasks on a
standalone cluster? I saw that
https://issues.apache.org/jira/browse/FLINK-1003 would maybe provide
what we need, but that is currently not worked on as it seems.

Cheers,
Jürgen
Reply | Threaded
Open this post in threaded view
|

Re: Distributing Tasks over Task manager

rmetzger0
Hi Jürgen,

Are you using the DataStream or the DataSet API?
Maybe the operator chaining is causing too many operations to be "packed" into one task. Check out this documentation page: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups 
You could try to disable chaining completely to see if that resolves the issue (you'll probably pay for this by having more serialization overhead and network traffic).

If my suggestions don't help, can you post a screenshot of your job plan (from the web interface) here, so that we see what operations you are performing?

Regards,
Robert



On Wed, Oct 12, 2016 at 12:52 PM, Jürgen Thomann <[hidden email]> wrote:
Hi,

we currently have an issue with Flink, as it allocates many tasks to the same task manager and as a result it overloads it. I reduced the amount of task slots per task manager (keeping the CPU count) and added some more servers but that did not help to distribute the load.

Is there some way to force Flink to distribute the load/tasks on a standalone cluster? I saw that https://issues.apache.org/jira/browse/FLINK-1003 would maybe provide what we need, but that is currently not worked on as it seems.

Cheers,
Jürgen

Reply | Threaded
Open this post in threaded view
|

Re: Distributing Tasks over Task manager

Jürgen Thomann

Hi Robert,

Thanks for your suggestions. We are using the DataStream API and I tried it with disabling it completely, but that didn't help.

I attached the plan and to add some context, it starts with a Kafka source followed by a map operation ( parallelism 4). The next map is the expensive part with a parallelism of 18 which produces a Tuple2 which is used for splitting. Starting here the parallelism is always 2 except the sink with 1. Both resulting streams have two maps, a filter, one more map and are ending with an assignTimestampsAndWatermarks. If there is now a small box in the picture it is a filter operation and otherwise it goes directly to a keyBy, timewindow and apply operation followed by a sink.

If one task manager contains more sub tasks of the expensive map than any other task manager, everything later in the stream is running on the same task manager. If two task manager have the same amount of sub tasks, the following tasks with a parallelism of 2 are distributed over the two task manager.

Interesting is also that the task manager have 6 task slots configured and the expensive part has 6 sub tasks on one task manager but still everything later in the flow is running on this task manager. This also happens if operator chaining is disabled.

Best,
Jürgen


On 12.10.2016 17:43, Robert Metzger wrote:
Hi Jürgen,

Are you using the DataStream or the DataSet API?
Maybe the operator chaining is causing too many operations to be "packed" into one task. Check out this documentation page: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups 
You could try to disable chaining completely to see if that resolves the issue (you'll probably pay for this by having more serialization overhead and network traffic).

If my suggestions don't help, can you post a screenshot of your job plan (from the web interface) here, so that we see what operations you are performing?

Regards,
Robert



On Wed, Oct 12, 2016 at 12:52 PM, Jürgen Thomann <[hidden email]> wrote:
Hi,

we currently have an issue with Flink, as it allocates many tasks to the same task manager and as a result it overloads it. I reduced the amount of task slots per task manager (keeping the CPU count) and added some more servers but that did not help to distribute the load.

Is there some way to force Flink to distribute the load/tasks on a standalone cluster? I saw that https://issues.apache.org/jira/browse/FLINK-1003 would maybe provide what we need, but that is currently not worked on as it seems.

Cheers,
Jürgen


plan.png (52K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Distributing Tasks over Task manager

Jürgen Thomann
Hi Robert,

Do you already had a chance to look on it? If you need more information
just let me know.

Regards,
Jürgen

On 12.10.2016 21:12, Jürgen Thomann wrote:

>
> Hi Robert,
>
> Thanks for your suggestions. We are using the DataStream API and I
> tried it with disabling it completely, but that didn't help.
>
> I attached the plan and to add some context, it starts with a Kafka
> source followed by a map operation ( parallelism 4). The next map is
> the expensive part with a parallelism of 18 which produces a Tuple2
> which is used for splitting. Starting here the parallelism is always 2
> except the sink with 1. Both resulting streams have two maps, a
> filter, one more map and are ending with an
> assignTimestampsAndWatermarks. If there is now a small box in the
> picture it is a filter operation and otherwise it goes directly to a
> keyBy, timewindow and apply operation followed by a sink.
>
> If one task manager contains more sub tasks of the expensive map than
> any other task manager, everything later in the stream is running on
> the same task manager. If two task manager have the same amount of sub
> tasks, the following tasks with a parallelism of 2 are distributed
> over the two task manager.
>
> Interesting is also that the task manager have 6 task slots configured
> and the expensive part has 6 sub tasks on one task manager but still
> everything later in the flow is running on this task manager. This
> also happens if operator chaining is disabled.
>
> Best,
> Jürgen
>
>
> On 12.10.2016 17:43, Robert Metzger wrote:
>> Hi Jürgen,
>>
>> Are you using the DataStream or the DataSet API?
>> Maybe the operator chaining is causing too many operations to be
>> "packed" into one task. Check out this documentation page:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups 
>>
>> You could try to disable chaining completely to see if that resolves
>> the issue (you'll probably pay for this by having more serialization
>> overhead and network traffic).
>>
>> If my suggestions don't help, can you post a screenshot of your job
>> plan (from the web interface) here, so that we see what operations
>> you are performing?
>>
>> Regards,
>> Robert
>>
Reply | Threaded
Open this post in threaded view
|

Re: Distributing Tasks over Task manager

rmetzger0
I'm sorry for the delay. I've added Till who knows the scheduler details to the conversation.

On Tue, Oct 18, 2016 at 3:09 PM, Jürgen Thomann <[hidden email]> wrote:
Hi Robert,

Do you already had a chance to look on it? If you need more information just let me know.

Regards,
Jürgen


On 12.10.2016 21:12, Jürgen Thomann wrote:

Hi Robert,

Thanks for your suggestions. We are using the DataStream API and I tried it with disabling it completely, but that didn't help.

I attached the plan and to add some context, it starts with a Kafka source followed by a map operation ( parallelism 4). The next map is the expensive part with a parallelism of 18 which produces a Tuple2 which is used for splitting. Starting here the parallelism is always 2 except the sink with 1. Both resulting streams have two maps, a filter, one more map and are ending with an assignTimestampsAndWatermarks. If there is now a small box in the picture it is a filter operation and otherwise it goes directly to a keyBy, timewindow and apply operation followed by a sink.

If one task manager contains more sub tasks of the expensive map than any other task manager, everything later in the stream is running on the same task manager. If two task manager have the same amount of sub tasks, the following tasks with a parallelism of 2 are distributed over the two task manager.

Interesting is also that the task manager have 6 task slots configured and the expensive part has 6 sub tasks on one task manager but still everything later in the flow is running on this task manager. This also happens if operator chaining is disabled.

Best,
Jürgen


On 12.10.2016 17:43, Robert Metzger wrote:
Hi Jürgen,

Are you using the DataStream or the DataSet API?
Maybe the operator chaining is causing too many operations to be "packed" into one task. Check out this documentation page: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups
You could try to disable chaining completely to see if that resolves the issue (you'll probably pay for this by having more serialization overhead and network traffic).

If my suggestions don't help, can you post a screenshot of your job plan (from the web interface) here, so that we see what operations you are performing?

Regards,
Robert


Reply | Threaded
Open this post in threaded view
|

Re: Distributing Tasks over Task manager

Till Rohrmann
Hi Jürgen,

In a nutshell, Flink's scheduling works the following way: The sources are deployed wrt to local preferences. If there are no local preferences then the first machine from a map's iterator which stores the machines is used. So in general, the sources will first fill up the first available machine. So you should see that all of your sources are deployed to the same machine. 

Next the downstream operators. They are scheduled to machines which have one of its inputs running, if possible. Since you've changed the parallelism from 4 to 18, this implies that you have an all to all pattern. Thus, first the machine which runs the sources will be filled up with the expensive mappers, because this machine contains all the inputs. The same applies to the tasks with parallelism 2. Just in some cases when one of the expensive mappers runs on a second machine because the first one is full, it might happen that one of the tasks with parallelism 2 is deployed to this machine.

So what you can do to better spread the work is to either put the expensive mappers in a different slot sharing group. This will cause them to be deployed to their own slots (but at the cost of higher network traffic). Alternatively, you can set the parallelism of every operator to numMachines * numSlotsPerMachine. That way, the load should be distributed better. But the proper solution would probably to implement the JIRA issue which you've mentioned.

Flink's concept of slot sharing and scheduling is described here [1, 2]. The usage of slot sharing groups is described here [3].


Cheers,
Till

On Wed, Oct 26, 2016 at 11:16 AM, Robert Metzger <[hidden email]> wrote:
I'm sorry for the delay. I've added Till who knows the scheduler details to the conversation.

On Tue, Oct 18, 2016 at 3:09 PM, Jürgen Thomann <[hidden email]> wrote:
Hi Robert,

Do you already had a chance to look on it? If you need more information just let me know.

Regards,
Jürgen


On 12.10.2016 21:12, Jürgen Thomann wrote:

Hi Robert,

Thanks for your suggestions. We are using the DataStream API and I tried it with disabling it completely, but that didn't help.

I attached the plan and to add some context, it starts with a Kafka source followed by a map operation ( parallelism 4). The next map is the expensive part with a parallelism of 18 which produces a Tuple2 which is used for splitting. Starting here the parallelism is always 2 except the sink with 1. Both resulting streams have two maps, a filter, one more map and are ending with an assignTimestampsAndWatermarks. If there is now a small box in the picture it is a filter operation and otherwise it goes directly to a keyBy, timewindow and apply operation followed by a sink.

If one task manager contains more sub tasks of the expensive map than any other task manager, everything later in the stream is running on the same task manager. If two task manager have the same amount of sub tasks, the following tasks with a parallelism of 2 are distributed over the two task manager.

Interesting is also that the task manager have 6 task slots configured and the expensive part has 6 sub tasks on one task manager but still everything later in the flow is running on this task manager. This also happens if operator chaining is disabled.

Best,
Jürgen


On 12.10.2016 17:43, Robert Metzger wrote:
Hi Jürgen,

Are you using the DataStream or the DataSet API?
Maybe the operator chaining is causing too many operations to be "packed" into one task. Check out this documentation page: https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html#task-chaining-and-resource-groups
You could try to disable chaining completely to see if that resolves the issue (you'll probably pay for this by having more serialization overhead and network traffic).

If my suggestions don't help, can you post a screenshot of your job plan (from the web interface) here, so that we see what operations you are performing?

Regards,
Robert