How do I make sure to place operator instances in specific Task Managers?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How do I make sure to place operator instances in specific Task Managers?

Felipe Gutierrez
For instance, if I have the following DAG with the respect parallelism
in parenthesis (I hope the dag appears real afterall):

  source01 -> map01(4) -> flatmap01(4) \

  |-> keyBy -> reducer(8)
  source02 -> map02(4) -> flatmap02(4) /

And I have 4 TMs in 4 machines with 4 cores each. I would like to
place source01 and map01 and flatmap01 in TM-01. source02 and map02
and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
in TM-04.

I am using the methods "setParallelism()" and "slotSharingGroup()" to
define it but both source01 and source02 are placed in TM-01 and map01
is split into 2 TMs. The same with map02.

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I make sure to place operator instances in specific Task Managers?

HuWeihua
Hi, Felipe

Flink does not support run tasks on specified TM. 
You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM.

Can you please give the reason for specifying TM?


Best
Weihua Hu

2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道:

For instance, if I have the following DAG with the respect parallelism
in parenthesis (I hope the dag appears real afterall):

 source01 -> map01(4) -> flatmap01(4) \

 |-> keyBy -> reducer(8)
 source02 -> map02(4) -> flatmap02(4) /

And I have 4 TMs in 4 machines with 4 cores each. I would like to
place source01 and map01 and flatmap01 in TM-01. source02 and map02
and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
in TM-04.

I am using the methods "setParallelism()" and "slotSharingGroup()" to
define it but both source01 and source02 are placed in TM-01 and map01
is split into 2 TMs. The same with map02.

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Reply | Threaded
Open this post in threaded view
|

Re: How do I make sure to place operator instances in specific Task Managers?

Felipe Gutierrez
because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use
network channels.

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote:

>
> Hi, Felipe
>
> Flink does not support run tasks on specified TM.
> You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM.
>
> Can you please give the reason for specifying TM?
>
>
> Best
> Weihua Hu
>
> 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道:
>
> For instance, if I have the following DAG with the respect parallelism
> in parenthesis (I hope the dag appears real afterall):
>
>  source01 -> map01(4) -> flatmap01(4) \
>
>  |-> keyBy -> reducer(8)
>  source02 -> map02(4) -> flatmap02(4) /
>
> And I have 4 TMs in 4 machines with 4 cores each. I would like to
> place source01 and map01 and flatmap01 in TM-01. source02 and map02
> and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> in TM-04.
>
> I am using the methods "setParallelism()" and "slotSharingGroup()" to
> define it but both source01 and source02 are placed in TM-01 and map01
> is split into 2 TMs. The same with map02.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How do I make sure to place operator instances in specific Task Managers?

Felipe Gutierrez
Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though
they are placed in the same TM. And this starts splitting the
downstream operators in different TM as well.

 stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \
 stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 /
  |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3

I am not sure which configuration I can adjust in the
conf/flink-conf.yaml file to make it works. Currently, my
configuration is like this bellow on the four TMs.

taskmanager.numberOfTaskSlots: 4
parallelism.default: 4

Maybe if I use different numberOfTaskSlots on different TMs would it work?

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez
<[hidden email]> wrote:

>
> because I am measuring one operator (all instances) and I want to
> place its downstream operators in another machine in order to use
> network channels.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote:
> >
> > Hi, Felipe
> >
> > Flink does not support run tasks on specified TM.
> > You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM.
> >
> > Can you please give the reason for specifying TM?
> >
> >
> > Best
> > Weihua Hu
> >
> > 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道:
> >
> > For instance, if I have the following DAG with the respect parallelism
> > in parenthesis (I hope the dag appears real afterall):
> >
> >  source01 -> map01(4) -> flatmap01(4) \
> >
> >  |-> keyBy -> reducer(8)
> >  source02 -> map02(4) -> flatmap02(4) /
> >
> > And I have 4 TMs in 4 machines with 4 cores each. I would like to
> > place source01 and map01 and flatmap01 in TM-01. source02 and map02
> > and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> > operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> > in TM-04.
> >
> > I am using the methods "setParallelism()" and "slotSharingGroup()" to
> > define it but both source01 and source02 are placed in TM-01 and map01
> > is split into 2 TMs. The same with map02.
> >
> > Thanks,
> > Felipe
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> >
Reply | Threaded
Open this post in threaded view
|

Re: How do I make sure to place operator instances in specific Task Managers?

HuWeihua
Hi, Felipe 

sorry for late reply.
You can try to config taskmanager.numberOfTaskSlots = 1 and use different slotSharingGroup to make sure Task do not placed in same TM.

Best
Weihua Hu

2020年5月29日 17:07,Felipe Gutierrez <[hidden email]> 写道:

Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though
they are placed in the same TM. And this starts splitting the
downstream operators in different TM as well.

stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \
stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 /
 |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3

I am not sure which configuration I can adjust in the
conf/flink-conf.yaml file to make it works. Currently, my
configuration is like this bellow on the four TMs.

taskmanager.numberOfTaskSlots: 4
parallelism.default: 4

Maybe if I use different numberOfTaskSlots on different TMs would it work?

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez
<[hidden email]> wrote:

because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use
network channels.

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote:

Hi, Felipe

Flink does not support run tasks on specified TM.
You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM.

Can you please give the reason for specifying TM?


Best
Weihua Hu

2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道:

For instance, if I have the following DAG with the respect parallelism
in parenthesis (I hope the dag appears real afterall):

source01 -> map01(4) -> flatmap01(4) \

|-> keyBy -> reducer(8)
source02 -> map02(4) -> flatmap02(4) /

And I have 4 TMs in 4 machines with 4 cores each. I would like to
place source01 and map01 and flatmap01 in TM-01. source02 and map02
and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
in TM-04.

I am using the methods "setParallelism()" and "slotSharingGroup()" to
define it but both source01 and source02 are placed in TM-01 and map01
is split into 2 TMs. The same with map02.

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com



Reply | Threaded
Open this post in threaded view
|

Re: How do I make sure to place operator instances in specific Task Managers?

Felipe Gutierrez
I am afraid that in the scenario that I am trying to deploy the QEP it
does not work. Let me explain.

I have 4 machines with 8 cores each. I want to set the parallelism for
all operators to 16. My QEP is:
source(1)->map(16)->flatmap(16)-keyBy->reduce(16)

So I would like to have 8 maps and 8 flatmaps in the first machine, 8
maps and 8 flatmaps in the second machine, 8 reducers in the third
machine, and 8 reducers in the fourth machine.

When I set taskmanager.numberOfTaskSlots = 3 I cannot have as many
parallelism as I want (16). Because Each slot runs one parallel
pipeline (from flink-conf.yaml). So I need 8 slots in each TM.
When I use one slotSharingGroup for source, map, and flatmap, and
other slotSharingGroup for the reducer, and parallelism of 16, somehow
Grafana is showing to me more than 16 parallel instances of the
operators.

\Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Wed, Jun 3, 2020 at 2:42 PM Weihua Hu <[hidden email]> wrote:

>
> Hi, Felipe
>
> sorry for late reply.
> You can try to config taskmanager.numberOfTaskSlots = 1 and use different slotSharingGroup to make sure Task do not placed in same TM.
>
> Best
> Weihua Hu
>
> 2020年5月29日 17:07,Felipe Gutierrez <[hidden email]> 写道:
>
> Using slotSharingGroup I can do some placement. however, I am using
> two different slotSharingGroup for two different sources, even though
> they are placed in the same TM. And this starts splitting the
> downstream operators in different TM as well.
>
> stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \
> stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 /
>  |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3
>
> I am not sure which configuration I can adjust in the
> conf/flink-conf.yaml file to make it works. Currently, my
> configuration is like this bellow on the four TMs.
>
> taskmanager.numberOfTaskSlots: 4
> parallelism.default: 4
>
> Maybe if I use different numberOfTaskSlots on different TMs would it work?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez
> <[hidden email]> wrote:
>
>
> because I am measuring one operator (all instances) and I want to
> place its downstream operators in another machine in order to use
> network channels.
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote:
>
>
> Hi, Felipe
>
> Flink does not support run tasks on specified TM.
> You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM.
>
> Can you please give the reason for specifying TM?
>
>
> Best
> Weihua Hu
>
> 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道:
>
> For instance, if I have the following DAG with the respect parallelism
> in parenthesis (I hope the dag appears real afterall):
>
> source01 -> map01(4) -> flatmap01(4) \
>
> |-> keyBy -> reducer(8)
> source02 -> map02(4) -> flatmap02(4) /
>
> And I have 4 TMs in 4 machines with 4 cores each. I would like to
> place source01 and map01 and flatmap01 in TM-01. source02 and map02
> and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> in TM-04.
>
> I am using the methods "setParallelism()" and "slotSharingGroup()" to
> define it but both source01 and source02 are placed in TM-01 and map01
> is split into 2 TMs. The same with map02.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
>