Parameters to Control Intra-node Parallelism

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Hi,

We are trying to scale some of our scientific applications written in Flink. A few questions on tuning Flink performance.

1. What parameters are available to control parallelism within a node?
2. Does Flink support shared memory-based messaging within a node (without doing TCP calls)?
3. Is there support for Infiniband interconnect?

Thank you,
Saliya

--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ufuk Celebi
On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> wrote:
> 1. What parameters are available to control parallelism within a node?

Task Manager processing slots:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

> 2. Does Flink support shared memory-based messaging within a node (without
> doing TCP calls)?

Yes, local exchanges happen via memory and not TCP, for example if you
have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
exchange data locally.

> 3. Is there support for Infiniband interconnect?

No, not that I'm aware of.

– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Thank you, I'll check these. 

In 2.) you said they are likely to exchange through memory. Is there a case why they wouldn't?

On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> wrote:
> 1. What parameters are available to control parallelism within a node?

Task Manager processing slots:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

> 2. Does Flink support shared memory-based messaging within a node (without
> doing TCP calls)?

Yes, local exchanges happen via memory and not TCP, for example if you
have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
exchange data locally.

> 3. Is there support for Infiniband interconnect?

No, not that I'm aware of.

– Ufuk



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Hi Ufuk,

Looking at the document you sent it seems only 1 task manager per node exist and within that you have multiple slots. Is it possible to run more than 1 task manager per node? Also, within a task manager is the parallelism done through threads or processes?

Thank you,
Saliya

On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I'll check these. 

In 2.) you said they are likely to exchange through memory. Is there a case why they wouldn't?

On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> wrote:
> 1. What parameters are available to control parallelism within a node?

Task Manager processing slots:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

> 2. Does Flink support shared memory-based messaging within a node (without
> doing TCP calls)?

Yes, local exchanges happen via memory and not TCP, for example if you
have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
exchange data locally.

> 3. Is there support for Infiniband interconnect?

No, not that I'm aware of.

– Ufuk



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ufuk Celebi
Regarding 2) if you don't manually configure something else, that
should happen always.

Yes, you can run more than one task manager per node depending on the
process isolation you want. Within a task manager, there are multiple
threads for each slot. For example, if you have 2 task managers with 2
slots each and submit a job with parallelism 4, each task manager will
execute 2 sub tasks in separate Threads.


On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> wrote:

> Hi Ufuk,
>
> Looking at the document you sent it seems only 1 task manager per node exist
> and within that you have multiple slots. Is it possible to run more than 1
> task manager per node? Also, within a task manager is the parallelism done
> through threads or processes?
>
> Thank you,
> Saliya
>
> On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> wrote:
>>
>> Thank you, I'll check these.
>>
>> In 2.) you said they are likely to exchange through memory. Is there a
>> case why they wouldn't?
>>
>> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]>
>>> wrote:
>>> > 1. What parameters are available to control parallelism within a node?
>>>
>>> Task Manager processing slots:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>>>
>>> > 2. Does Flink support shared memory-based messaging within a node
>>> > (without
>>> > doing TCP calls)?
>>>
>>> Yes, local exchanges happen via memory and not TCP, for example if you
>>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>>> exchange data locally.
>>>
>>> > 3. Is there support for Infiniband interconnect?
>>>
>>> No, not that I'm aware of.
>>>
>>> – Ufuk
>>
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
That's great, so is there support to pin task managers to sockets as well?

On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
Regarding 2) if you don't manually configure something else, that
should happen always.

Yes, you can run more than one task manager per node depending on the
process isolation you want. Within a task manager, there are multiple
threads for each slot. For example, if you have 2 task managers with 2
slots each and submit a job with parallelism 4, each task manager will
execute 2 sub tasks in separate Threads.


On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi Ufuk,
>
> Looking at the document you sent it seems only 1 task manager per node exist
> and within that you have multiple slots. Is it possible to run more than 1
> task manager per node? Also, within a task manager is the parallelism done
> through threads or processes?
>
> Thank you,
> Saliya
>
> On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> wrote:
>>
>> Thank you, I'll check these.
>>
>> In 2.) you said they are likely to exchange through memory. Is there a
>> case why they wouldn't?
>>
>> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>>>
>>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]>
>>> wrote:
>>> > 1. What parameters are available to control parallelism within a node?
>>>
>>> Task Manager processing slots:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>>>
>>> > 2. Does Flink support shared memory-based messaging within a node
>>> > (without
>>> > doing TCP calls)?
>>>
>>> Yes, local exchanges happen via memory and not TCP, for example if you
>>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>>> exchange data locally.
>>>
>>> > 3. Is there support for Infiniband interconnect?
>>>
>>> No, not that I'm aware of.
>>>
>>> – Ufuk
>>
>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ufuk Celebi
No, not inside of Flink. That sounds like something like the OS or
resource manager should handle.

On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]> wrote:

> That's great, so is there support to pin task managers to sockets as well?
>
> On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Regarding 2) if you don't manually configure something else, that
>> should happen always.
>>
>> Yes, you can run more than one task manager per node depending on the
>> process isolation you want. Within a task manager, there are multiple
>> threads for each slot. For example, if you have 2 task managers with 2
>> slots each and submit a job with parallelism 4, each task manager will
>> execute 2 sub tasks in separate Threads.
>>
>>
>> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Hi Ufuk,
>> >
>> > Looking at the document you sent it seems only 1 task manager per node
>> > exist
>> > and within that you have multiple slots. Is it possible to run more than
>> > 1
>> > task manager per node? Also, within a task manager is the parallelism
>> > done
>> > through threads or processes?
>> >
>> > Thank you,
>> > Saliya
>> >
>> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]>
>> > wrote:
>> >>
>> >> Thank you, I'll check these.
>> >>
>> >> In 2.) you said they are likely to exchange through memory. Is there a
>> >> case why they wouldn't?
>> >>
>> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>>
>> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]>
>> >>> wrote:
>> >>> > 1. What parameters are available to control parallelism within a
>> >>> > node?
>> >>>
>> >>> Task Manager processing slots:
>> >>>
>> >>>
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >>>
>> >>> > 2. Does Flink support shared memory-based messaging within a node
>> >>> > (without
>> >>> > doing TCP calls)?
>> >>>
>> >>> Yes, local exchanges happen via memory and not TCP, for example if you
>> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> >>> exchange data locally.
>> >>>
>> >>> > 3. Is there support for Infiniband interconnect?
>> >>>
>> >>> No, not that I'm aware of.
>> >>>
>> >>> – Ufuk
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Saliya Ekanayake
>> >> Ph.D. Candidate | Research Assistant
>> >> School of Informatics and Computing | Digital Science Center
>> >> Indiana University, Bloomington
>> >>
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Thank you, yes, it can be done externally, if not supported within Flink. 

So the way to spawn multiple task managers would be to list the same slave machines N times as necessary in the slaves file?

On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
No, not inside of Flink. That sounds like something like the OS or
resource manager should handle.

On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]> wrote:
> That's great, so is there support to pin task managers to sockets as well?
>
> On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Regarding 2) if you don't manually configure something else, that
>> should happen always.
>>
>> Yes, you can run more than one task manager per node depending on the
>> process isolation you want. Within a task manager, there are multiple
>> threads for each slot. For example, if you have 2 task managers with 2
>> slots each and submit a job with parallelism 4, each task manager will
>> execute 2 sub tasks in separate Threads.
>>
>>
>> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Hi Ufuk,
>> >
>> > Looking at the document you sent it seems only 1 task manager per node
>> > exist
>> > and within that you have multiple slots. Is it possible to run more than
>> > 1
>> > task manager per node? Also, within a task manager is the parallelism
>> > done
>> > through threads or processes?
>> >
>> > Thank you,
>> > Saliya
>> >
>> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]>
>> > wrote:
>> >>
>> >> Thank you, I'll check these.
>> >>
>> >> In 2.) you said they are likely to exchange through memory. Is there a
>> >> case why they wouldn't?
>> >>
>> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>>
>> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]>
>> >>> wrote:
>> >>> > 1. What parameters are available to control parallelism within a
>> >>> > node?
>> >>>
>> >>> Task Manager processing slots:
>> >>>
>> >>>
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >>>
>> >>> > 2. Does Flink support shared memory-based messaging within a node
>> >>> > (without
>> >>> > doing TCP calls)?
>> >>>
>> >>> Yes, local exchanges happen via memory and not TCP, for example if you
>> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> >>> exchange data locally.
>> >>>
>> >>> > 3. Is there support for Infiniband interconnect?
>> >>>
>> >>> No, not that I'm aware of.
>> >>>
>> >>> – Ufuk
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Saliya Ekanayake
>> >> Ph.D. Candidate | Research Assistant
>> >> School of Informatics and Computing | Digital Science Center
>> >> Indiana University, Bloomington
>> >>
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ufuk Celebi
Yes, exactly.

On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]> wrote:

> Thank you, yes, it can be done externally, if not supported within Flink.
>
> So the way to spawn multiple task managers would be to list the same slave
> machines N times as necessary in the slaves file?
>
> On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> No, not inside of Flink. That sounds like something like the OS or
>> resource manager should handle.
>>
>> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > That's great, so is there support to pin task managers to sockets as
>> > well?
>> >
>> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> Regarding 2) if you don't manually configure something else, that
>> >> should happen always.
>> >>
>> >> Yes, you can run more than one task manager per node depending on the
>> >> process isolation you want. Within a task manager, there are multiple
>> >> threads for each slot. For example, if you have 2 task managers with 2
>> >> slots each and submit a job with parallelism 4, each task manager will
>> >> execute 2 sub tasks in separate Threads.
>> >>
>> >>
>> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > Hi Ufuk,
>> >> >
>> >> > Looking at the document you sent it seems only 1 task manager per
>> >> > node
>> >> > exist
>> >> > and within that you have multiple slots. Is it possible to run more
>> >> > than
>> >> > 1
>> >> > task manager per node? Also, within a task manager is the parallelism
>> >> > done
>> >> > through threads or processes?
>> >> >
>> >> > Thank you,
>> >> > Saliya
>> >> >
>> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> Thank you, I'll check these.
>> >> >>
>> >> >> In 2.) you said they are likely to exchange through memory. Is there
>> >> >> a
>> >> >> case why they wouldn't?
>> >> >>
>> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>>
>> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >>> <[hidden email]>
>> >> >>> wrote:
>> >> >>> > 1. What parameters are available to control parallelism within a
>> >> >>> > node?
>> >> >>>
>> >> >>> Task Manager processing slots:
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >>>
>> >> >>> > 2. Does Flink support shared memory-based messaging within a node
>> >> >>> > (without
>> >> >>> > doing TCP calls)?
>> >> >>>
>> >> >>> Yes, local exchanges happen via memory and not TCP, for example if
>> >> >>> you
>> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> >> >>> exchange data locally.
>> >> >>>
>> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >>>
>> >> >>> No, not that I'm aware of.
>> >> >>>
>> >> >>> – Ufuk
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Saliya Ekanayake
>> >> >> Ph.D. Candidate | Research Assistant
>> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> Indiana University, Bloomington
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Thank you!

On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
Yes, exactly.

On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]> wrote:
> Thank you, yes, it can be done externally, if not supported within Flink.
>
> So the way to spawn multiple task managers would be to list the same slave
> machines N times as necessary in the slaves file?
>
> On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> No, not inside of Flink. That sounds like something like the OS or
>> resource manager should handle.
>>
>> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > That's great, so is there support to pin task managers to sockets as
>> > well?
>> >
>> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> Regarding 2) if you don't manually configure something else, that
>> >> should happen always.
>> >>
>> >> Yes, you can run more than one task manager per node depending on the
>> >> process isolation you want. Within a task manager, there are multiple
>> >> threads for each slot. For example, if you have 2 task managers with 2
>> >> slots each and submit a job with parallelism 4, each task manager will
>> >> execute 2 sub tasks in separate Threads.
>> >>
>> >>
>> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > Hi Ufuk,
>> >> >
>> >> > Looking at the document you sent it seems only 1 task manager per
>> >> > node
>> >> > exist
>> >> > and within that you have multiple slots. Is it possible to run more
>> >> > than
>> >> > 1
>> >> > task manager per node? Also, within a task manager is the parallelism
>> >> > done
>> >> > through threads or processes?
>> >> >
>> >> > Thank you,
>> >> > Saliya
>> >> >
>> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> Thank you, I'll check these.
>> >> >>
>> >> >> In 2.) you said they are likely to exchange through memory. Is there
>> >> >> a
>> >> >> case why they wouldn't?
>> >> >>
>> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>>
>> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >>> <[hidden email]>
>> >> >>> wrote:
>> >> >>> > 1. What parameters are available to control parallelism within a
>> >> >>> > node?
>> >> >>>
>> >> >>> Task Manager processing slots:
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >>>
>> >> >>> > 2. Does Flink support shared memory-based messaging within a node
>> >> >>> > (without
>> >> >>> > doing TCP calls)?
>> >> >>>
>> >> >>> Yes, local exchanges happen via memory and not TCP, for example if
>> >> >>> you
>> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> >> >>> exchange data locally.
>> >> >>>
>> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >>>
>> >> >>> No, not that I'm aware of.
>> >> >>>
>> >> >>> – Ufuk
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Saliya Ekanayake
>> >> >> Ph.D. Candidate | Research Assistant
>> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> Indiana University, Bloomington
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
In reply to this post by Ufuk Celebi
I tried to run more than one task manager per node by duplicating the slave IPs. At startup it says for example,

[INFO] 1 instance(s) of taskmanager are already running on j-011.
Starting taskmanager daemon on host j-011.

but I only see 1 task manager process running.

Is there anything else I need to do?

On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
Yes, exactly.

On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]> wrote:
> Thank you, yes, it can be done externally, if not supported within Flink.
>
> So the way to spawn multiple task managers would be to list the same slave
> machines N times as necessary in the slaves file?
>
> On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> No, not inside of Flink. That sounds like something like the OS or
>> resource manager should handle.
>>
>> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > That's great, so is there support to pin task managers to sockets as
>> > well?
>> >
>> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> Regarding 2) if you don't manually configure something else, that
>> >> should happen always.
>> >>
>> >> Yes, you can run more than one task manager per node depending on the
>> >> process isolation you want. Within a task manager, there are multiple
>> >> threads for each slot. For example, if you have 2 task managers with 2
>> >> slots each and submit a job with parallelism 4, each task manager will
>> >> execute 2 sub tasks in separate Threads.
>> >>
>> >>
>> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > Hi Ufuk,
>> >> >
>> >> > Looking at the document you sent it seems only 1 task manager per
>> >> > node
>> >> > exist
>> >> > and within that you have multiple slots. Is it possible to run more
>> >> > than
>> >> > 1
>> >> > task manager per node? Also, within a task manager is the parallelism
>> >> > done
>> >> > through threads or processes?
>> >> >
>> >> > Thank you,
>> >> > Saliya
>> >> >
>> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]>
>> >> > wrote:
>> >> >>
>> >> >> Thank you, I'll check these.
>> >> >>
>> >> >> In 2.) you said they are likely to exchange through memory. Is there
>> >> >> a
>> >> >> case why they wouldn't?
>> >> >>
>> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>>
>> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >>> <[hidden email]>
>> >> >>> wrote:
>> >> >>> > 1. What parameters are available to control parallelism within a
>> >> >>> > node?
>> >> >>>
>> >> >>> Task Manager processing slots:
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >>>
>> >> >>> > 2. Does Flink support shared memory-based messaging within a node
>> >> >>> > (without
>> >> >>> > doing TCP calls)?
>> >> >>>
>> >> >>> Yes, local exchanges happen via memory and not TCP, for example if
>> >> >>> you
>> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to
>> >> >>> exchange data locally.
>> >> >>>
>> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >>>
>> >> >>> No, not that I'm aware of.
>> >> >>>
>> >> >>> – Ufuk
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Saliya Ekanayake
>> >> >> Ph.D. Candidate | Research Assistant
>> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> Indiana University, Bloomington
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ufuk Celebi
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:

> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


flink-sekanaya-taskmanager-0-j-002.log (11K) Download Attachment
flink-sekanaya-taskmanager-1-j-002.log (11K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

rmetzger0
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Greg Hogan
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows.

On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Greg,

where did you see the OOM log as shown in this mail thread? In my case none of the TaskManagers nor JobManger reports an error like this.

On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <[hidden email]> wrote:
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows.

On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Greg Hogan
The OOM killer doesn't give warning so you'll need to call dmesg or look in /var/log/messages or similar. The following reports that Debian flavors may use /var/log/syslog.
  http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer

On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake <[hidden email]> wrote:
Greg,

where did you see the OOM log as shown in this mail thread? In my case none of the TaskManagers nor JobManger reports an error like this.

On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <[hidden email]> wrote:
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows.

On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Saliya Ekanayake
Thank you Greg, I'll check if this was the cause for my TMs to disappear.

On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan <[hidden email]> wrote:
The OOM killer doesn't give warning so you'll need to call dmesg or look in /var/log/messages or similar. The following reports that Debian flavors may use /var/log/syslog.
  http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer

On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake <[hidden email]> wrote:
Greg,

where did you see the OOM log as shown in this mail thread? In my case none of the TaskManagers nor JobManger reports an error like this.

On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <[hidden email]> wrote:
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows.

On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington

Reply | Threaded
Open this post in threaded view
|

Re: Parameters to Control Intra-node Parallelism

Ovidiu-Cristian MARCU
Hi,

Can you post your configuration parameters (exclude default settings) and cluster description?

Best,
Ovidiu
On 11 Jul 2016, at 17:49, Saliya Ekanayake <[hidden email]> wrote:

Thank you Greg, I'll check if this was the cause for my TMs to disappear.

On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan <[hidden email]> wrote:
The OOM killer doesn't give warning so you'll need to call dmesg or look in /var/log/messages or similar. The following reports that Debian flavors may use /var/log/syslog.
  http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer

On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake <[hidden email]> wrote:
Greg,

where did you see the OOM log as shown in this mail thread? In my case none of the TaskManagers nor JobManger reports an error like this.

On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <[hidden email]> wrote:
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows.

On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I checked, but JVMs didn't crash. No puppet or other services like that.

One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked.



On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Hi,
from the TaskManager logs, I can not see anything suspicious.
Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping.
Also, if they would be still running, I would expect some error messages from akka about the connection status.
So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened?

Do you have any service like puppet that is controlling processes?


On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. 

However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts?

--------------------------

- Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12.

2016-07-07 11:32:40,363 WARN  akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

2016-07-07 11:32:42,722 INFO  org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24.

2016-07-07 11:33:15,316 WARN  Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888

2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated.
2016-07-07 11:33:15,320 INFO  org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12.


On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote:
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine
(taskmanager-X-j-011.log where X is the TM number)? If yes, the task
manager process does start up and there is another problem. If not,
the task managers seems not to start even.

– Ufuk

On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote:
> I tried to run more than one task manager per node by duplicating the slave
> IPs. At startup it says for example,
>
> [INFO] 1 instance(s) of taskmanager are already running on j-011.
> Starting taskmanager daemon on host j-011.
>
> but I only see 1 task manager process running.
>
> Is there anything else I need to do?
>
> On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote:
>>
>> Yes, exactly.
>>
>> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]>
>> wrote:
>> > Thank you, yes, it can be done externally, if not supported within
>> > Flink.
>> >
>> > So the way to spawn multiple task managers would be to list the same
>> > slave
>> > machines N times as necessary in the slaves file?
>> >
>> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote:
>> >>
>> >> No, not inside of Flink. That sounds like something like the OS or
>> >> resource manager should handle.
>> >>
>> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]>
>> >> wrote:
>> >> > That's great, so is there support to pin task managers to sockets as
>> >> > well?
>> >> >
>> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote:
>> >> >>
>> >> >> Regarding 2) if you don't manually configure something else, that
>> >> >> should happen always.
>> >> >>
>> >> >> Yes, you can run more than one task manager per node depending on
>> >> >> the
>> >> >> process isolation you want. Within a task manager, there are
>> >> >> multiple
>> >> >> threads for each slot. For example, if you have 2 task managers with
>> >> >> 2
>> >> >> slots each and submit a job with parallelism 4, each task manager
>> >> >> will
>> >> >> execute 2 sub tasks in separate Threads.
>> >> >>
>> >> >>
>> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]>
>> >> >> wrote:
>> >> >> > Hi Ufuk,
>> >> >> >
>> >> >> > Looking at the document you sent it seems only 1 task manager per
>> >> >> > node
>> >> >> > exist
>> >> >> > and within that you have multiple slots. Is it possible to run
>> >> >> > more
>> >> >> > than
>> >> >> > 1
>> >> >> > task manager per node? Also, within a task manager is the
>> >> >> > parallelism
>> >> >> > done
>> >> >> > through threads or processes?
>> >> >> >
>> >> >> > Thank you,
>> >> >> > Saliya
>> >> >> >
>> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake
>> >> >> > <[hidden email]>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Thank you, I'll check these.
>> >> >> >>
>> >> >> >> In 2.) you said they are likely to exchange through memory. Is
>> >> >> >> there
>> >> >> >> a
>> >> >> >> case why they wouldn't?
>> >> >> >>
>> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake
>> >> >> >>> <[hidden email]>
>> >> >> >>> wrote:
>> >> >> >>> > 1. What parameters are available to control parallelism within
>> >> >> >>> > a
>> >> >> >>> > node?
>> >> >> >>>
>> >> >> >>> Task Manager processing slots:
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>> >> >> >>>
>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a
>> >> >> >>> > node
>> >> >> >>> > (without
>> >> >> >>> > doing TCP calls)?
>> >> >> >>>
>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example
>> >> >> >>> if
>> >> >> >>> you
>> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely
>> >> >> >>> to
>> >> >> >>> exchange data locally.
>> >> >> >>>
>> >> >> >>> > 3. Is there support for Infiniband interconnect?
>> >> >> >>>
>> >> >> >>> No, not that I'm aware of.
>> >> >> >>>
>> >> >> >>> – Ufuk
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Saliya Ekanayake
>> >> >> >> Ph.D. Candidate | Research Assistant
>> >> >> >> School of Informatics and Computing | Digital Science Center
>> >> >> >> Indiana University, Bloomington
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > --
>> >> >> > Saliya Ekanayake
>> >> >> > Ph.D. Candidate | Research Assistant
>> >> >> > School of Informatics and Computing | Digital Science Center
>> >> >> > Indiana University, Bloomington
>> >> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Saliya Ekanayake
>> >> > Ph.D. Candidate | Research Assistant
>> >> > School of Informatics and Computing | Digital Science Center
>> >> > Indiana University, Bloomington
>> >> >
>> >
>> >
>> >
>> >
>> > --
>> > Saliya Ekanayake
>> > Ph.D. Candidate | Research Assistant
>> > School of Informatics and Computing | Digital Science Center
>> > Indiana University, Bloomington
>> >
>
>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
>



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington





--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington


12