Running streaming job on every node of cluster

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Running streaming job on every node of cluster

Evgeny Kincharov

Hi,

 

I have the simplest streaming job, and I want to distribute my job on every node of my Flink cluster.

 

Job is simple:

 

source (SocketTextStream) -> map -> sink (AsyncHtttpSink).

 

When I increase parallelism of my job when deploying or directly in code, no effect because source is can’t work in parallel.

Now I reduce “Tasks Slots” to 1 on ever nodes and deploy my job as many times as nodes in the cluster.

It works when I have only one job. If I want deploy another in parallel there is no free slots.

I hope more convenient way to do that is exists. Thanks.

 

BR,

Evgeny

Reply | Threaded
Open this post in threaded view
|

Re: Running streaming job on every node of cluster

Nico Kruber
Hi Evgeny,
I tried to reproduce your example with the following code, having another
console listening with "nc -l 12345"

env.setParallelism(2);
env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
                .map(new MapFunction<String, String>() {
                        @Override
                        public String map(final String s) throws Exception { return s; }
                })
                .addSink(new DiscardingSink<String>());

This way, I do get a source with parallelism 1 and map & sink with parallelism
2 and the whole program accompanying 2 slots as expected. You can check in the
web interface of your cluster how many slots are taken after executing one
instance of your program.

How do you set your parallelism?


Nico

On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:

> Hi,
>
> I have the simplest streaming job, and I want to distribute my job on every
> node of my Flink cluster.
>
> Job is simple:
>
> source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
>
> When I increase parallelism of my job when deploying or directly in code, no
> effect because source is can't work in parallel. Now I reduce "Tasks Slots"
> to 1 on ever nodes and deploy my job as many times as nodes in the cluster.
> It works when I have only one job. If I want deploy another in parallel
> there is no free slots. I hope more convenient way to do that is exists.
> Thanks.
>
> BR,
> Evgeny


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Running streaming job on every node of cluster

Evgeny Kincharov
In reply to this post by Evgeny Kincharov

Thanks for your answer.

The problem is that both slots are seized in the one node. Of course if this node has enough free slots.

Another nodes idle. I want to utilize cluster resource little bit more.

May be the other deployment modes allow it.

 

BR, Evgeny.

 

От: [hidden email]
Отправлено: 27 февраля 2017 г. в 20:07
Кому: [hidden email]
Копия: [hidden email]
Тема: Re: Running streaming job on every node of cluster

 

Hi Evgeny,

I tried to reproduce your example with the following code, having another

console listening with "nc -l 12345"

 

env.setParallelism(2);

env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))

                               .map(new MapFunction<String, String>() {

                                               @Override

                                               public String map(final String s) throws Exception { return s; }

                               })

                               .addSink(new DiscardingSink<String>());

 

This way, I do get a source with parallelism 1 and map & sink with parallelism

2 and the whole program accompanying 2 slots as expected. You can check in the

web interface of your cluster how many slots are taken after executing one

instance of your program.

 

How do you set your parallelism?

 

 

Nico

 

On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:

> Hi,

>

> I have the simplest streaming job, and I want to distribute my job on every

> node of my Flink cluster.

>

> Job is simple:

>

> source (SocketTextStream) -> map -> sink (AsyncHtttpSink).

>

> When I increase parallelism of my job when deploying or directly in code, no

> effect because source is can't work in parallel. Now I reduce "Tasks Slots"

> to 1 on ever nodes and deploy my job as many times as nodes in the cluster.

> It works when I have only one job. If I want deploy another in parallel

> there is no free slots. I hope more convenient way to do that is exists.

> Thanks.

>

> BR,

> Evgeny

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Running streaming job on every node of cluster

Nico Kruber
What about setting the parallelism[1] to the total number of slots in your
cluster?
By default, all parts of your program are put into the same slot sharing
group[2] and by setting the parallelism you would have this slot (with your
whole program) in each parallel slot as well (minus/plus operators that have
lower/higher parallelism), if I understand it correctly.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
parallel.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
datastream_api.html#task-chaining-and-resource-groups

On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote:

> Thanks for your answer.
> The problem is that both slots are seized in the one node. Of course if this
> node has enough free slots. Another nodes idle. I want to utilize cluster
> resource little bit more. May be the other deployment modes allow it.
>
> BR, Evgeny.
>
> От: Nico Kruber<mailto:[hidden email]>
> Отправлено: 27 февраля 2017 г. в 20:07
> Кому: [hidden email]<mailto:[hidden email]>
> Копия: Evgeny Kincharov<mailto:[hidden email]>
> Тема: Re: Running streaming job on every node of cluster
>
> Hi Evgeny,
> I tried to reproduce your example with the following code, having another
> console listening with "nc -l 12345"
>
> env.setParallelism(2);
> env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
>                                .map(new MapFunction<String, String>() {
>                                                @Override
>                                                public String map(final
> String s) throws Exception { return s; }
 })
>                                .addSink(new DiscardingSink<String>());
>
> This way, I do get a source with parallelism 1 and map & sink with
> parallelism
 2 and the whole program accompanying 2 slots as expected. You

> can check in the web interface of your cluster how many slots are taken
> after executing one instance of your program.
>
> How do you set your parallelism?
>
>
> Nico
>
> On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:
>
> > Hi,
> >
> >
> >
> > I have the simplest streaming job, and I want to distribute my job on
> > every
 node of my Flink cluster.

> >
> >
> >
> > Job is simple:
> >
> >
> >
> > source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
> >
> >
> >
> > When I increase parallelism of my job when deploying or directly in code,
> > no
 effect because source is can't work in parallel. Now I reduce "Tasks

> > Slots" to 1 on ever nodes and deploy my job as many times as nodes in the
> > cluster. It works when I have only one job. If I want deploy another in
> > parallel there is no free slots. I hope more convenient way to do that is
> > exists. Thanks.
> >
> >
> >
> > BR,
> > Evgeny
>
>
>


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Running streaming job on every node of cluster

Nico Kruber
this may also be a good read:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/
runtime.html#task-slots-and-resources

On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote:

> What about setting the parallelism[1] to the total number of slots in your
> cluster?
> By default, all parts of your program are put into the same slot sharing
> group[2] and by setting the parallelism you would have this slot (with your
> whole program) in each parallel slot as well (minus/plus operators that have
> lower/higher parallelism), if I understand it correctly.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> parallel.html
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> datastream_api.html#task-chaining-and-resource-groups
>
> On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote:
> > Thanks for your answer.
> > The problem is that both slots are seized in the one node. Of course if
> > this node has enough free slots. Another nodes idle. I want to utilize
> > cluster resource little bit more. May be the other deployment modes allow
> > it.
> >
> > BR, Evgeny.
> >
> > От: Nico Kruber<mailto:[hidden email]>
> > Отправлено: 27 февраля 2017 г. в 20:07
> > Кому: [hidden email]<mailto:[hidden email]>
> > Копия: Evgeny Kincharov<mailto:[hidden email]>
> > Тема: Re: Running streaming job on every node of cluster
> >
> > Hi Evgeny,
> > I tried to reproduce your example with the following code, having another
> > console listening with "nc -l 12345"
> >
> > env.setParallelism(2);
> > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
> >
> >                                .map(new MapFunction<String, String>() {
> >                                
> >                                                @Override
> >                                                public String map(final
> >
> > String s) throws Exception { return s; }
>
>  })
>
> >                                .addSink(new DiscardingSink<String>());
> >
> > This way, I do get a source with parallelism 1 and map & sink with
> > parallelism
>
>  2 and the whole program accompanying 2 slots as expected. You
>
> > can check in the web interface of your cluster how many slots are taken
> > after executing one instance of your program.
> >
> > How do you set your parallelism?
> >
> >
> > Nico
> >
> > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:
> > > Hi,
> > >
> > >
> > >
> > > I have the simplest streaming job, and I want to distribute my job on
> > > every
>
>  node of my Flink cluster.
>
> > > Job is simple:
> > >
> > >
> > >
> > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
> > >
> > >
> > >
> > > When I increase parallelism of my job when deploying or directly in
> > > code,
> > > no
>
>  effect because source is can't work in parallel. Now I reduce "Tasks
>
> > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in
> > > the
> > > cluster. It works when I have only one job. If I want deploy another in
> > > parallel there is no free slots. I hope more convenient way to do that
> > > is
> > > exists. Thanks.
> > >
> > >
> > >
> > > BR,
> > > Evgeny


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Running streaming job on every node of cluster

Evgeny Kincharov
In reply to this post by Evgeny Kincharov

A lot of thanks, Niko.

 

Really interesting materials. I thought about using 1 Slot per node.

But in this case we don’t have possibility to run another jobs in nodes where is running the highload job.

And Example 3 from picture at the end of [1] is little bit incorrect.

In case parallelism=2 “Task Manager 2” will be empty and both tasks will be run on "Task Manager 1". I think the compact packaging of tasks in the one taskmanager is good solution for the most use cases.

But it is a little bit counterintuitive for me and little inconvenient.

I prefer have possibility to change default behavior. May be will be better continue this discussion in dev mail list?

Nico, what do you think?

 

BR, Evgeny.

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#configuring-taskmanager-processing-slots

 

От: [hidden email]
Отправлено: 27 февраля 2017 г. в 21:50
Кому: [hidden email]
Копия: [hidden email]
Тема: Re: Running streaming job on every node of cluster

 

this may also be a good read:

 

https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/

runtime.html#task-slots-and-resources

 

On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote:

> What about setting the parallelism[1] to the total number of slots in your

> cluster?

> By default, all parts of your program are put into the same slot sharing

> group[2] and by setting the parallelism you would have this slot (with your

> whole program) in each parallel slot as well (minus/plus operators that have

> lower/higher parallelism), if I understand it correctly.

>

>

> Nico

>

> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/

> parallel.html

> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/

> datastream_api.html#task-chaining-and-resource-groups

>

> On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote:

> > Thanks for your answer.

> > The problem is that both slots are seized in the one node. Of course if

> > this node has enough free slots. Another nodes idle. I want to utilize

> > cluster resource little bit more. May be the other deployment modes allow

> > it.

> >

> > BR, Evgeny.

> >

> > От: Nico Kruber<mailto:[hidden email]>

> > Отправлено: 27 февраля 2017 г. в 20:07

> > Кому: [hidden email]<mailto:[hidden email]>

> > Копия: Evgeny Kincharov<mailto:[hidden email]>

> > Тема: Re: Running streaming job on every node of cluster

> >

> > Hi Evgeny,

> > I tried to reproduce your example with the following code, having another

> > console listening with "nc -l 12345"

> >

> > env.setParallelism(2);

> > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))

> >

> >                                .map(new MapFunction<String, String>() {

> >                               

> >                                                @Override

> >                                                public String map(final

> >

> > String s) throws Exception { return s; }

>

>  })

>

> >                                .addSink(new DiscardingSink<String>());

> >

> > This way, I do get a source with parallelism 1 and map & sink with

> > parallelism

>

>  2 and the whole program accompanying 2 slots as expected. You

>

> > can check in the web interface of your cluster how many slots are taken

> > after executing one instance of your program.

> >

> > How do you set your parallelism?

> >

> >

> > Nico

> >

> > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:

> > > Hi,

> > >

> > >

> > >

> > > I have the simplest streaming job, and I want to distribute my job on

> > > every

>

>  node of my Flink cluster.

>

> > > Job is simple:

> > >

> > >

> > >

> > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink).

> > >

> > >

> > >

> > > When I increase parallelism of my job when deploying or directly in

> > > code,

> > > no

>

>  effect because source is can't work in parallel. Now I reduce "Tasks

>

> > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in

> > > the

> > > cluster. It works when I have only one job. If I want deploy another in

> > > parallel there is no free slots. I hope more convenient way to do that

> > > is

> > > exists. Thanks.

> > >

> > >

> > >

> > > BR,

> > > Evgeny

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Running streaming job on every node of cluster

Nico Kruber
Hi Evgeny,
regarding slot allocation, at the moment, you really cannot influence which
slots of which TMs are used by your program, e.g. whether two slots of a
single TM are used or whether one slot of each of two TMs is used when your
program is occupying two slots in total.
Afaik there are no plans to change that, but Stephan (cc'd) may know more.

Flink sees all slots as independent from each other and by specifying 2 slots
at a task manager (TM), you basically say that this machine has capability to
run 2 jobs/parts of a job in parallel (and that it has enough resources to do
so).
Network-wise it makes sense to use slots at the same TM, CPU-wise it depends
on your program and the available resources at the TM.


Nico

On Tuesday, 28 February 2017 08:57:17 CET Evgeny Kincharov wrote:

> A lot of thanks, Niko.
>
> Really interesting materials. I thought about using 1 Slot per node.
> But in this case we don’t have possibility to run another jobs in nodes
> where is running the highload job. And Example 3 from picture at the end
> of [1] is little bit incorrect. In case parallelism=2 “Task Manager 2” will
> be empty and both tasks will be run on "Task Manager 1". I think the
> compact packaging of tasks in the one taskmanager is good solution for the
> most use cases. But it is a little bit counterintuitive for me and little
> inconvenient. I prefer have possibility to change default behavior. May be
> will be better continue this discussion in dev mail list? Nico, what do you
> think?
>
> BR, Evgeny.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.ht
> ml#configuring-taskmanager-processing-slots
 

> От: Nico Kruber<mailto:[hidden email]>
> Отправлено: 27 февраля 2017 г. в 21:50
> Кому: Evgeny Kincharov<mailto:[hidden email]>
> Копия: [hidden email]<mailto:[hidden email]>
> Тема: Re: Running streaming job on every node of cluster
>
> this may also be a good read:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/
> runtime.html#task-slots-and-resources
>
> On Monday, 27 February 2017 18:40:48 CET Nico Kruber wrote:
>
> > What about setting the parallelism[1] to the total number of slots in
> > your
> > cluster?
> > By default, all parts of your program are put into the same slot sharing
> > group[2] and by setting the parallelism you would have this slot (with
> > your
 whole program) in each parallel slot as well (minus/plus operators

> > that have lower/higher parallelism), if I understand it correctly.
> >
> >
> >
> >
> > Nico
> >
> >
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> > parallel.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> > datastream_api.html#task-chaining-and-resource-groups
> >
> >
> >
> > On Monday, 27 February 2017 18:17:53 CET Evgeny Kincharov wrote:
> >
> > > Thanks for your answer.
> > > The problem is that both slots are seized in the one node. Of course if
> > > this node has enough free slots. Another nodes idle. I want to utilize
> > > cluster resource little bit more. May be the other deployment modes
> > > allow
> > > it.
> > >
> > >
> > >
> > > BR, Evgeny.
> > >
> > >
> > >
> > > От: Nico Kruber<mailto:[hidden email]>
> > > Отправлено: 27 февраля 2017 г. в 20:07
> > > Кому: [hidden email]<mailto:[hidden email]>
> > > Копия: Evgeny Kincharov<mailto:[hidden email]>
> > > Тема: Re: Running streaming job on every node of cluster
> > >
> > >
> > >
> > > Hi Evgeny,
> > > I tried to reproduce your example with the following code, having
> > > another
> > > console listening with "nc -l 12345"
> > >
> > >
> > >
> > > env.setParallelism(2);
> > > env.addSource(new SocketTextStreamFunction("localhost", 12345, " ", 3))
> > >
> > >
> > >
> > >                                .map(new MapFunction<String, String>() {
> > >
> > >
> > >
> > >                                                @Override
> > >                                                public String map(final
> > >
> > >
> > >
> > > String s) throws Exception { return s; }
> >
> >
> >
> >  })
> >
> >
> >
> > >                                .addSink(new DiscardingSink<String>());
> > >
> > >
> > >
> > > This way, I do get a source with parallelism 1 and map & sink with
> > > parallelism
> >
> >
> >
> >  2 and the whole program accompanying 2 slots as expected. You
> >
> >
> >
> > > can check in the web interface of your cluster how many slots are taken
> > > after executing one instance of your program.
> > >
> > >
> > >
> > > How do you set your parallelism?
> > >
> > >
> > >
> > >
> > > Nico
> > >
> > >
> > >
> > > On Monday, 27 February 2017 14:04:21 CET Evgeny Kincharov wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > I have the simplest streaming job, and I want to distribute my job on
> > > > every
> >
> >
> >
> >  node of my Flink cluster.
> >
> >
> >
> > > > Job is simple:
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > source (SocketTextStream) -> map -> sink (AsyncHtttpSink).
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > When I increase parallelism of my job when deploying or directly in
> > > > code,
> > > > no
> >
> >
> >
> >  effect because source is can't work in parallel. Now I reduce "Tasks
> >
> >
> >
> > > > Slots" to 1 on ever nodes and deploy my job as many times as nodes in
> > > > the
> > > > cluster. It works when I have only one job. If I want deploy another
> > > > in
> > > > parallel there is no free slots. I hope more convenient way to do
> > > > that
> > > > is
> > > > exists. Thanks.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > BR,
> > > > Evgeny
>
>
>


signature.asc (201 bytes) Download Attachment