Hi,
We are trying to scale some of our scientific applications written in Flink. A few questions on tuning Flink performance. 1. What parameters are available to control parallelism within a node? 2. Does Flink support shared memory-based messaging within a node (without doing TCP calls)? 3. Is there support for Infiniband interconnect? Thank you, Saliya Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> wrote:
> 1. What parameters are available to control parallelism within a node? Task Manager processing slots: https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots > 2. Does Flink support shared memory-based messaging within a node (without > doing TCP calls)? Yes, local exchanges happen via memory and not TCP, for example if you have a map-reduce, map subtask 1 and reduce subtask 1 are likely to exchange data locally. > 3. Is there support for Infiniband interconnect? No, not that I'm aware of. – Ufuk |
Thank you, I'll check these. In 2.) you said they are likely to exchange through memory. Is there a case why they wouldn't? On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote: On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> wrote: Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
Hi Ufuk, Looking at the document you sent it seems only 1 task manager per node exist and within that you have multiple slots. Is it possible to run more than 1 task manager per node? Also, within a task manager is the parallelism done through threads or processes? Thank you, Saliya On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
Regarding 2) if you don't manually configure something else, that
should happen always. Yes, you can run more than one task manager per node depending on the process isolation you want. Within a task manager, there are multiple threads for each slot. For example, if you have 2 task managers with 2 slots each and submit a job with parallelism 4, each task manager will execute 2 sub tasks in separate Threads. On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> wrote: > Hi Ufuk, > > Looking at the document you sent it seems only 1 task manager per node exist > and within that you have multiple slots. Is it possible to run more than 1 > task manager per node? Also, within a task manager is the parallelism done > through threads or processes? > > Thank you, > Saliya > > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> wrote: >> >> Thank you, I'll check these. >> >> In 2.) you said they are likely to exchange through memory. Is there a >> case why they wouldn't? >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote: >>> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> >>> wrote: >>> > 1. What parameters are available to control parallelism within a node? >>> >>> Task Manager processing slots: >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots >>> >>> > 2. Does Flink support shared memory-based messaging within a node >>> > (without >>> > doing TCP calls)? >>> >>> Yes, local exchanges happen via memory and not TCP, for example if you >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to >>> exchange data locally. >>> >>> > 3. Is there support for Infiniband interconnect? >>> >>> No, not that I'm aware of. >>> >>> – Ufuk >> >> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> > > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > |
That's great, so is there support to pin task managers to sockets as well? On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote: Regarding 2) if you don't manually configure something else, that Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
No, not inside of Flink. That sounds like something like the OS or
resource manager should handle. On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]> wrote: > That's great, so is there support to pin task managers to sockets as well? > > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote: >> >> Regarding 2) if you don't manually configure something else, that >> should happen always. >> >> Yes, you can run more than one task manager per node depending on the >> process isolation you want. Within a task manager, there are multiple >> threads for each slot. For example, if you have 2 task managers with 2 >> slots each and submit a job with parallelism 4, each task manager will >> execute 2 sub tasks in separate Threads. >> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> >> wrote: >> > Hi Ufuk, >> > >> > Looking at the document you sent it seems only 1 task manager per node >> > exist >> > and within that you have multiple slots. Is it possible to run more than >> > 1 >> > task manager per node? Also, within a task manager is the parallelism >> > done >> > through threads or processes? >> > >> > Thank you, >> > Saliya >> > >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> >> > wrote: >> >> >> >> Thank you, I'll check these. >> >> >> >> In 2.) you said they are likely to exchange through memory. Is there a >> >> case why they wouldn't? >> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote: >> >>> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake <[hidden email]> >> >>> wrote: >> >>> > 1. What parameters are available to control parallelism within a >> >>> > node? >> >>> >> >>> Task Manager processing slots: >> >>> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots >> >>> >> >>> > 2. Does Flink support shared memory-based messaging within a node >> >>> > (without >> >>> > doing TCP calls)? >> >>> >> >>> Yes, local exchanges happen via memory and not TCP, for example if you >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to >> >>> exchange data locally. >> >>> >> >>> > 3. Is there support for Infiniband interconnect? >> >>> >> >>> No, not that I'm aware of. >> >>> >> >>> – Ufuk >> >> >> >> >> >> >> >> >> >> -- >> >> Saliya Ekanayake >> >> Ph.D. Candidate | Research Assistant >> >> School of Informatics and Computing | Digital Science Center >> >> Indiana University, Bloomington >> >> >> > >> > >> > >> > -- >> > Saliya Ekanayake >> > Ph.D. Candidate | Research Assistant >> > School of Informatics and Computing | Digital Science Center >> > Indiana University, Bloomington >> > > > > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > |
Thank you, yes, it can be done externally, if not supported within Flink. So the way to spawn multiple task managers would be to list the same slave machines N times as necessary in the slaves file? On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote: No, not inside of Flink. That sounds like something like the OS or Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
Yes, exactly.
On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]> wrote: > Thank you, yes, it can be done externally, if not supported within Flink. > > So the way to spawn multiple task managers would be to list the same slave > machines N times as necessary in the slaves file? > > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote: >> >> No, not inside of Flink. That sounds like something like the OS or >> resource manager should handle. >> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]> >> wrote: >> > That's great, so is there support to pin task managers to sockets as >> > well? >> > >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> Regarding 2) if you don't manually configure something else, that >> >> should happen always. >> >> >> >> Yes, you can run more than one task manager per node depending on the >> >> process isolation you want. Within a task manager, there are multiple >> >> threads for each slot. For example, if you have 2 task managers with 2 >> >> slots each and submit a job with parallelism 4, each task manager will >> >> execute 2 sub tasks in separate Threads. >> >> >> >> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> >> >> wrote: >> >> > Hi Ufuk, >> >> > >> >> > Looking at the document you sent it seems only 1 task manager per >> >> > node >> >> > exist >> >> > and within that you have multiple slots. Is it possible to run more >> >> > than >> >> > 1 >> >> > task manager per node? Also, within a task manager is the parallelism >> >> > done >> >> > through threads or processes? >> >> > >> >> > Thank you, >> >> > Saliya >> >> > >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake <[hidden email]> >> >> > wrote: >> >> >> >> >> >> Thank you, I'll check these. >> >> >> >> >> >> In 2.) you said they are likely to exchange through memory. Is there >> >> >> a >> >> >> case why they wouldn't? >> >> >> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >>> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake >> >> >>> <[hidden email]> >> >> >>> wrote: >> >> >>> > 1. What parameters are available to control parallelism within a >> >> >>> > node? >> >> >>> >> >> >>> Task Manager processing slots: >> >> >>> >> >> >>> >> >> >>> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots >> >> >>> >> >> >>> > 2. Does Flink support shared memory-based messaging within a node >> >> >>> > (without >> >> >>> > doing TCP calls)? >> >> >>> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example if >> >> >>> you >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely to >> >> >>> exchange data locally. >> >> >>> >> >> >>> > 3. Is there support for Infiniband interconnect? >> >> >>> >> >> >>> No, not that I'm aware of. >> >> >>> >> >> >>> – Ufuk >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> Saliya Ekanayake >> >> >> Ph.D. Candidate | Research Assistant >> >> >> School of Informatics and Computing | Digital Science Center >> >> >> Indiana University, Bloomington >> >> >> >> >> > >> >> > >> >> > >> >> > -- >> >> > Saliya Ekanayake >> >> > Ph.D. Candidate | Research Assistant >> >> > School of Informatics and Computing | Digital Science Center >> >> > Indiana University, Bloomington >> >> > >> > >> > >> > >> > >> > -- >> > Saliya Ekanayake >> > Ph.D. Candidate | Research Assistant >> > School of Informatics and Computing | Digital Science Center >> > Indiana University, Bloomington >> > > > > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > |
Thank you! On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote: Yes, exactly. Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
In reply to this post by Ufuk Celebi
I tried to run more than one task manager per node by duplicating the slave IPs. At startup it says for example, [INFO] 1 instance(s) of taskmanager are already running on j-011. Starting taskmanager daemon on host j-011. but I only see 1 task manager process running. Is there anything else I need to do? On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote: Yes, exactly. Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
No that should suffice. Can you check whether there are any task
manager logs for the second TM on that machine (taskmanager-X-j-011.log where X is the TM number)? If yes, the task manager process does start up and there is another problem. If not, the task managers seems not to start even. – Ufuk On Thu, Jul 7, 2016 at 7:34 AM, Saliya Ekanayake <[hidden email]> wrote: > I tried to run more than one task manager per node by duplicating the slave > IPs. At startup it says for example, > > [INFO] 1 instance(s) of taskmanager are already running on j-011. > Starting taskmanager daemon on host j-011. > > but I only see 1 task manager process running. > > Is there anything else I need to do? > > On Sun, Jul 3, 2016 at 11:28 AM, Ufuk Celebi <[hidden email]> wrote: >> >> Yes, exactly. >> >> On Sat, Jul 2, 2016 at 6:28 PM, Saliya Ekanayake <[hidden email]> >> wrote: >> > Thank you, yes, it can be done externally, if not supported within >> > Flink. >> > >> > So the way to spawn multiple task managers would be to list the same >> > slave >> > machines N times as necessary in the slaves file? >> > >> > On Sat, Jul 2, 2016 at 11:22 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> No, not inside of Flink. That sounds like something like the OS or >> >> resource manager should handle. >> >> >> >> On Sat, Jul 2, 2016 at 5:12 PM, Saliya Ekanayake <[hidden email]> >> >> wrote: >> >> > That's great, so is there support to pin task managers to sockets as >> >> > well? >> >> > >> >> > On Sat, Jul 2, 2016 at 11:08 AM, Ufuk Celebi <[hidden email]> wrote: >> >> >> >> >> >> Regarding 2) if you don't manually configure something else, that >> >> >> should happen always. >> >> >> >> >> >> Yes, you can run more than one task manager per node depending on >> >> >> the >> >> >> process isolation you want. Within a task manager, there are >> >> >> multiple >> >> >> threads for each slot. For example, if you have 2 task managers with >> >> >> 2 >> >> >> slots each and submit a job with parallelism 4, each task manager >> >> >> will >> >> >> execute 2 sub tasks in separate Threads. >> >> >> >> >> >> >> >> >> On Sat, Jul 2, 2016 at 3:26 AM, Saliya Ekanayake <[hidden email]> >> >> >> wrote: >> >> >> > Hi Ufuk, >> >> >> > >> >> >> > Looking at the document you sent it seems only 1 task manager per >> >> >> > node >> >> >> > exist >> >> >> > and within that you have multiple slots. Is it possible to run >> >> >> > more >> >> >> > than >> >> >> > 1 >> >> >> > task manager per node? Also, within a task manager is the >> >> >> > parallelism >> >> >> > done >> >> >> > through threads or processes? >> >> >> > >> >> >> > Thank you, >> >> >> > Saliya >> >> >> > >> >> >> > On Thu, Jun 30, 2016 at 5:27 PM, Saliya Ekanayake >> >> >> > <[hidden email]> >> >> >> > wrote: >> >> >> >> >> >> >> >> Thank you, I'll check these. >> >> >> >> >> >> >> >> In 2.) you said they are likely to exchange through memory. Is >> >> >> >> there >> >> >> >> a >> >> >> >> case why they wouldn't? >> >> >> >> >> >> >> >> On Thu, Jun 30, 2016 at 5:03 AM, Ufuk Celebi <[hidden email]> >> >> >> >> wrote: >> >> >> >>> >> >> >> >>> On Thu, Jun 30, 2016 at 1:44 AM, Saliya Ekanayake >> >> >> >>> <[hidden email]> >> >> >> >>> wrote: >> >> >> >>> > 1. What parameters are available to control parallelism within >> >> >> >>> > a >> >> >> >>> > node? >> >> >> >>> >> >> >> >>> Task Manager processing slots: >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots >> >> >> >>> >> >> >> >>> > 2. Does Flink support shared memory-based messaging within a >> >> >> >>> > node >> >> >> >>> > (without >> >> >> >>> > doing TCP calls)? >> >> >> >>> >> >> >> >>> Yes, local exchanges happen via memory and not TCP, for example >> >> >> >>> if >> >> >> >>> you >> >> >> >>> have a map-reduce, map subtask 1 and reduce subtask 1 are likely >> >> >> >>> to >> >> >> >>> exchange data locally. >> >> >> >>> >> >> >> >>> > 3. Is there support for Infiniband interconnect? >> >> >> >>> >> >> >> >>> No, not that I'm aware of. >> >> >> >>> >> >> >> >>> – Ufuk >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> >> Saliya Ekanayake >> >> >> >> Ph.D. Candidate | Research Assistant >> >> >> >> School of Informatics and Computing | Digital Science Center >> >> >> >> Indiana University, Bloomington >> >> >> >> >> >> >> > >> >> >> > >> >> >> > >> >> >> > -- >> >> >> > Saliya Ekanayake >> >> >> > Ph.D. Candidate | Research Assistant >> >> >> > School of Informatics and Computing | Digital Science Center >> >> >> > Indiana University, Bloomington >> >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > -- >> >> > Saliya Ekanayake >> >> > Ph.D. Candidate | Research Assistant >> >> > School of Informatics and Computing | Digital Science Center >> >> > Indiana University, Bloomington >> >> > >> > >> > >> > >> > >> > -- >> > Saliya Ekanayake >> > Ph.D. Candidate | Research Assistant >> > School of Informatics and Computing | Digital Science Center >> > Indiana University, Bloomington >> > > > > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > |
I see two logs (attached), but there's only 1 TaskManger process. Also, the Web console says it can find only 1 TM. However, I see this part in JM log, which shows there was a second TM at one point, but it was unregistered. Any thoughts? -------------------------- - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:42888/user/taskmanager) as 1c65415701f19978c8a8cdc75c993717. Current number of registered hosts is 1. Current number of alive task slots is 12. 2016-07-07 11:32:40,363 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@172.16.0.2:42888] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2016-07-07 11:32:42,722 INFO org.apache.flink.runtime.instance.InstanceManager - Registered TaskManager at j-002 (akka.tcp://flink@172.16.0.2:37373/user/taskmanager) as 9c4ec66f5acbc19f7931fcae8345cd4e. Current number of registered hosts is 2. Current number of alive task slots is 24. 2016-07-07 11:33:15,316 WARN Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@172.16.0.2:42888]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /172.16.0.2:42888 2016-07-07 11:33:15,320 INFO org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager terminated. 2016-07-07 11:33:15,320 INFO org.apache.flink.runtime.instance.InstanceManager - Unregistered task manager akka.tcp://flink@172.16.0.2:42888/user/taskmanager. Number of registered task managers 1. Number of available slots 12. On Thu, Jul 7, 2016 at 4:27 AM, Ufuk Celebi <[hidden email]> wrote: No that should suffice. Can you check whether there are any task Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington flink-sekanaya-taskmanager-0-j-002.log (11K) Download Attachment flink-sekanaya-taskmanager-1-j-002.log (11K) Download Attachment |
Hi, from the TaskManager logs, I can not see anything suspicious. Its a bit weird that the TaskManager logs just end, without any shutdown messages. Usually the TMs log some shut down stuff when they are stopping. Also, if they would be still running, I would expect some error messages from akka about the connection status. So the only thing I conclude is that one of the TMs was killed by the OS or the JVM crashed. Did you check if that happened? Do you have any service like puppet that is controlling processes? On Thu, Jul 7, 2016 at 5:46 PM, Saliya Ekanayake <[hidden email]> wrote:
|
I checked, but JVMs didn't crash. No puppet or other services like that. One thing I found is that things work OK when I have a smaller number of slaves. For example, here I was trying to run on 16 nodes giving 2 TMs each. Then I reduced it to 4 nodes each with 2 TMs, which worked. On Fri, Jul 8, 2016 at 12:31 PM, Robert Metzger <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
These symptoms sounds similar to what I was experiencing in the following thread. Flink can have some unexpected memory usage which can result in an OOM kill by the kernel, and this becomes more pronounced as the cluster size grows. On Fri, Jul 8, 2016 at 12:46 PM, Saliya Ekanayake <[hidden email]> wrote:
|
Greg, where did you see the OOM log as shown in this mail thread? In my case none of the TaskManagers nor JobManger reports an error like this. On Sun, Jul 10, 2016 at 8:45 PM, Greg Hogan <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
The OOM killer doesn't give warning so you'll need to call dmesg or look in /var/log/messages or similar. The following reports that Debian flavors may use /var/log/syslog. http://stackoverflow.com/questions/624857/finding-which-process-was-killed-by-linux-oom-killer On Sun, Jul 10, 2016 at 11:55 PM, Saliya Ekanayake <[hidden email]> wrote:
|
Thank you Greg, I'll check if this was the cause for my TMs to disappear. On Mon, Jul 11, 2016 at 11:34 AM, Greg Hogan <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington |
Hi, Best, Ovidiu
|
Free forum by Nabble | Edit this page |