Hi, I have the simplest streaming job, and I want to distribute my job on every node of my Flink cluster. Job is simple: source (SocketTextStream) -> map -> sink (AsyncHtttpSink). When I increase parallelism of my job when deploying or directly in code, no effect because source is can’t work in parallel. Now I reduce “Tasks Slots” to 1 on ever nodes and deploy my job as many times as nodes in the cluster. It works when I have only one job. If I want deploy another in parallel there is no free slots. I hope more convenient way to do that is exists. Thanks. BR, Evgeny |
Hi Evgeny,
I tried to reproduce your example with the following code, having another console listening with "nc -l 12345" env.setParallelism(2); env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) .map(new MapFunction<String, String>() { @Override public String map(final String s) throws Exception { return s; } }) .addSink(new DiscardingSink<String>()); This way, I do get a source with parallelism 1 and map & sink with parallelism 2 and the whole program accompanying 2 slots as expected. You can check in the web interface of your cluster how many slots are taken after executing one instance of your program. How do you set your parallelism? Nico On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > Hi, > > I have the simplest streaming job, and I want to distribute my job on every > node of my Flink cluster. > > Job is simple: > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > When I increase parallelism of my job when deploying or directly in code, no > effect because source is can't work in parallel. Now I reduce "Tasks Slots" > to 1 on ever nodes and deploy my job as many times as nodes in the cluster. > It works when I have only one job. If I want deploy another in parallel > there is no free slots. I hope more convenient way to do that is exists. > Thanks. > > BR, > Evgeny signature.asc (201 bytes) Download Attachment |
In reply to this post by Evgeny Kincharov
Thanks for your answer. The problem is that both slots are seized in the one node. Of course if this node has enough free slots. Another nodes idle. I want to utilize cluster resource little bit more. May be the other deployment modes allow it. BR, Evgeny. От: [hidden email] Hi Evgeny, I tried to reproduce your example with the following code, having another console listening with "nc -l 12345" env.setParallelism(2); env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) .map(new MapFunction<String, String>() { @Override public String map(final String s) throws Exception { return s; } }) .addSink(new DiscardingSink<String>()); This way, I do get a source with parallelism 1 and map & sink with parallelism 2 and the whole program accompanying 2 slots as expected. You can check in the web interface of your cluster how many slots are taken after executing one instance of your program. How do you set your parallelism? Nico On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > Hi, > > I have the simplest streaming job, and I want to distribute my job on every > node of my Flink cluster. > > Job is simple: > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > When I increase parallelism of my job when deploying or directly in code, no > effect because source is can't work in parallel. Now I reduce "Tasks Slots" > to 1 on ever nodes and deploy my job as many times as nodes in the cluster. > It works when I have only one job. If I want deploy another in parallel > there is no free slots. I hope more convenient way to do that is exists. > Thanks. > > BR, > Evgeny |
What about setting the parallelism[1] to the total number of slots in your
cluster? By default, all parts of your program are put into the same slot sharing group[2] and by setting the parallelism you would have this slot (with your whole program) in each parallel slot as well (minus/plus operators that have lower/higher parallelism), if I understand it correctly. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ parallel.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ datastream_api.html#task-chaining-and-resource-groups On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote: > Thanks for your answer. > The problem is that both slots are seized in the one node. Of course if this > node has enough free slots. Another nodes idle. I want to utilize cluster > resource little bit more. May be the other deployment modes allow it. > > BR, Evgeny. > > От: Nico Kruber<mailto:[hidden email]> > Отправлено: 27 февраля 2017 г. в 20:07 > Кому: [hidden email]<mailto:[hidden email]> > Копия: Evgeny Kincharov<mailto:[hidden email]> > Тема: Re: Running streaming job on every node of cluster > > Hi Evgeny, > I tried to reproduce your example with the following code, having another > console listening with "nc -l 12345" > > env.setParallelism(2); > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) > .map(new MapFunction<String, String>() { > @Override > public String map(final > String s) throws Exception { return s; } > .addSink(new DiscardingSink<String>()); > > This way, I do get a source with parallelism 1 and map & sink with > parallelism > can check in the web interface of your cluster how many slots are taken > after executing one instance of your program. > > How do you set your parallelism? > > > Nico > > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > > > Hi, > > > > > > > > I have the simplest streaming job, and I want to distribute my job on > > every > > > > > > > > Job is simple: > > > > > > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > > > > > > > When I increase parallelism of my job when deploying or directly in code, > > no > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in the > > cluster. It works when I have only one job. If I want deploy another in > > parallel there is no free slots. I hope more convenient way to do that is > > exists. Thanks. > > > > > > > > BR, > > Evgeny > > > signature.asc (201 bytes) Download Attachment |
this may also be a good read:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/ runtime.html#task-slots-and-resources On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote: > What about setting the parallelism[1] to the total number of slots in your > cluster? > By default, all parts of your program are put into the same slot sharing > group[2] and by setting the parallelism you would have this slot (with your > whole program) in each parallel slot as well (minus/plus operators that have > lower/higher parallelism), if I understand it correctly. > > > Nico > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > parallel.html > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > datastream_api.html#task-chaining-and-resource-groups > > On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote: > > Thanks for your answer. > > The problem is that both slots are seized in the one node. Of course if > > this node has enough free slots. Another nodes idle. I want to utilize > > cluster resource little bit more. May be the other deployment modes allow > > it. > > > > BR, Evgeny. > > > > От: Nico Kruber<mailto:[hidden email]> > > Отправлено: 27 февраля 2017 г. в 20:07 > > Кому: [hidden email]<mailto:[hidden email]> > > Копия: Evgeny Kincharov<mailto:[hidden email]> > > Тема: Re: Running streaming job on every node of cluster > > > > Hi Evgeny, > > I tried to reproduce your example with the following code, having another > > console listening with "nc -l 12345" > > > > env.setParallelism(2); > > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) > > > > .map(new MapFunction<String, String>() { > > > > @Override > > public String map(final > > > > String s) throws Exception { return s; } > > }) > > > .addSink(new DiscardingSink<String>()); > > > > This way, I do get a source with parallelism 1 and map & sink with > > parallelism > > 2 and the whole program accompanying 2 slots as expected. You > > > can check in the web interface of your cluster how many slots are taken > > after executing one instance of your program. > > > > How do you set your parallelism? > > > > > > Nico > > > > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > > > Hi, > > > > > > > > > > > > I have the simplest streaming job, and I want to distribute my job on > > > every > > node of my Flink cluster. > > > > Job is simple: > > > > > > > > > > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > > > > > > > > > > > When I increase parallelism of my job when deploying or directly in > > > code, > > > no > > effect because source is can't work in parallel. Now I reduce "Tasks > > > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in > > > the > > > cluster. It works when I have only one job. If I want deploy another in > > > parallel there is no free slots. I hope more convenient way to do that > > > is > > > exists. Thanks. > > > > > > > > > > > > BR, > > > Evgeny signature.asc (201 bytes) Download Attachment |
In reply to this post by Evgeny Kincharov
A lot of thanks, Niko. Really interesting materials. I thought about using 1 Slot per node.
But in this case we don’t have possibility to run another jobs in nodes where is running the highload job. And Example 3 from picture at the end of [1] is little bit incorrect. In case parallelism=2 “Task Manager 2” will be empty and both tasks will be run on "Task Manager 1". I think the compact packaging of tasks in the one taskmanager is good solution for the most use cases.
But it is a little bit counterintuitive for me and little inconvenient. I prefer have possibility to change default behavior. May be will be better continue this discussion in dev mail list? Nico, what do you think? BR, Evgeny. От: [hidden email] this may also be a good read: https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/ runtime.html#task-slots-and-resources On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote: > What about setting the parallelism[1] to the total number of slots in your > cluster? > By default, all parts of your program are put into the same slot sharing > group[2] and by setting the parallelism you would have this slot (with your > whole program) in each parallel slot as well (minus/plus operators that have > lower/higher parallelism), if I understand it correctly. > > > Nico > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > parallel.html > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > datastream_api.html#task-chaining-and-resource-groups > > On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote: > > Thanks for your answer. > > The problem is that both slots are seized in the one node. Of course if > > this node has enough free slots. Another nodes idle. I want to utilize > > cluster resource little bit more. May be the other deployment modes allow > > it. > > > > BR, Evgeny. > > > > От: Nico Kruber<mailto:[hidden email]> > > Отправлено: 27 февраля 2017 г. в 20:07 > > Кому: [hidden email]<mailto:[hidden email]> > > Копия: Evgeny Kincharov<mailto:[hidden email]> > > Тема: Re: Running streaming job on every node of cluster > > > > Hi Evgeny, > > I tried to reproduce your example with the following code, having another > > console listening with "nc -l 12345" > > > > env.setParallelism(2); > > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) > > > > .map(new MapFunction<String, String>() { > > > > @Override > > public String map(final > > > > String s) throws Exception { return s; } > > }) > > > .addSink(new DiscardingSink<String>()); > > > > This way, I do get a source with parallelism 1 and map & sink with > > parallelism > > 2 and the whole program accompanying 2 slots as expected. You > > > can check in the web interface of your cluster how many slots are taken > > after executing one instance of your program. > > > > How do you set your parallelism? > > > > > > Nico > > > > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > > > Hi, > > > > > > > > > > > > I have the simplest streaming job, and I want to distribute my job on > > > every > > node of my Flink cluster. > > > > Job is simple: > > > > > > > > > > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > > > > > > > > > > > When I increase parallelism of my job when deploying or directly in > > > code, > > > no > > effect because source is can't work in parallel. Now I reduce "Tasks > > > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in > > > the > > > cluster. It works when I have only one job. If I want deploy another in > > > parallel there is no free slots. I hope more convenient way to do that > > > is > > > exists. Thanks. > > > > > > > > > > > > BR, > > > Evgeny |
Hi Evgeny,
regarding slot allocation, at the moment, you really cannot influence which slots of which TMs are used by your program, e.g. whether two slots of a single TM are used or whether one slot of each of two TMs is used when your program is occupying two slots in total. Afaik there are no plans to change that, but Stephan (cc'd) may know more. Flink sees all slots as independent from each other and by specifying 2 slots at a task manager (TM), you basically say that this machine has capability to run 2 jobs/parts of a job in parallel (and that it has enough resources to do so). Network-wise it makes sense to use slots at the same TM, CPU-wise it depends on your program and the available resources at the TM. Nico On Tuesday, 28 February 2017 08:57:17 CET Evgeny Kincharov wrote: > A lot of thanks, Niko. > > Really interesting materials. I thought about using 1 Slot per node. > But in this case we don’t have possibility to run another jobs in nodes > where is running the highload job. And Example 3 from picture at the end > of [1] is little bit incorrect. In case parallelism=2 “Task Manager 2” will > be empty and both tasks will be run on "Task Manager 1". I think the > compact packaging of tasks in the one taskmanager is good solution for the > most use cases. But it is a little bit counterintuitive for me and little > inconvenient. I prefer have possibility to change default behavior. May be > will be better continue this discussion in dev mail list? Nico, what do you > think? > > BR, Evgeny. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.ht > ml#configuring-taskmanager-processing-slots > От: Nico Kruber<mailto:[hidden email]> > Отправлено: 27 февраля 2017 г. в 21:50 > Кому: Evgeny Kincharov<mailto:[hidden email]> > Копия: [hidden email]<mailto:[hidden email]> > Тема: Re: Running streaming job on every node of cluster > > this may also be a good read: > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/ > runtime.html#task-slots-and-resources > > On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote: > > > What about setting the parallelism[1] to the total number of slots in > > your > > cluster? > > By default, all parts of your program are put into the same slot sharing > > group[2] and by setting the parallelism you would have this slot (with > > your > > that have lower/higher parallelism), if I understand it correctly. > > > > > > > > > > Nico > > > > > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > > parallel.html > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/ > > datastream_api.html#task-chaining-and-resource-groups > > > > > > > > On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote: > > > > > Thanks for your answer. > > > The problem is that both slots are seized in the one node. Of course if > > > this node has enough free slots. Another nodes idle. I want to utilize > > > cluster resource little bit more. May be the other deployment modes > > > allow > > > it. > > > > > > > > > > > > BR, Evgeny. > > > > > > > > > > > > От: Nico Kruber<mailto:[hidden email]> > > > Отправлено: 27 февраля 2017 г. в 20:07 > > > Кому: [hidden email]<mailto:[hidden email]> > > > Копия: Evgeny Kincharov<mailto:[hidden email]> > > > Тема: Re: Running streaming job on every node of cluster > > > > > > > > > > > > Hi Evgeny, > > > I tried to reproduce your example with the following code, having > > > another > > > console listening with "nc -l 12345" > > > > > > > > > > > > env.setParallelism(2); > > > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3)) > > > > > > > > > > > > .map(new MapFunction<String, String>() { > > > > > > > > > > > > @Override > > > public String map(final > > > > > > > > > > > > String s) throws Exception { return s; } > > > > > > > > }) > > > > > > > > > .addSink(new DiscardingSink<String>()); > > > > > > > > > > > > This way, I do get a source with parallelism 1 and map & sink with > > > parallelism > > > > > > > > 2 and the whole program accompanying 2 slots as expected. You > > > > > > > > > can check in the web interface of your cluster how many slots are taken > > > after executing one instance of your program. > > > > > > > > > > > > How do you set your parallelism? > > > > > > > > > > > > > > > Nico > > > > > > > > > > > > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote: > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > I have the simplest streaming job, and I want to distribute my job on > > > > every > > > > > > > > node of my Flink cluster. > > > > > > > > > > Job is simple: > > > > > > > > > > > > > > > > > > > > > > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink). > > > > > > > > > > > > > > > > > > > > > > > > When I increase parallelism of my job when deploying or directly in > > > > code, > > > > no > > > > > > > > effect because source is can't work in parallel. Now I reduce "Tasks > > > > > > > > > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in > > > > the > > > > cluster. It works when I have only one job. If I want deploy another > > > > in > > > > parallel there is no free slots. I hope more convenient way to do > > > > that > > > > is > > > > exists. Thanks. > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > Evgeny > > > signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |