For instance, if I have the following DAG with the respect parallelism
in parenthesis (I hope the dag appears real afterall): source01 -> map01(4) -> flatmap01(4) \ |-> keyBy -> reducer(8) source02 -> map02(4) -> flatmap02(4) / And I have 4 TMs in 4 machines with 4 cores each. I would like to place source01 and map01 and flatmap01 in TM-01. source02 and map02 and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8 in TM-04. I am using the methods "setParallelism()" and "slotSharingGroup()" to define it but both source01 and source02 are placed in TM-01 and map01 is split into 2 TMs. The same with map02. Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
Hi, Felipe Flink does not support run tasks on specified TM. You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. Can you please give the reason for specifying TM? Best Weihua Hu
|
because I am measuring one operator (all instances) and I want to
place its downstream operators in another machine in order to use network channels. -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote: > > Hi, Felipe > > Flink does not support run tasks on specified TM. > You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. > > Can you please give the reason for specifying TM? > > > Best > Weihua Hu > > 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道: > > For instance, if I have the following DAG with the respect parallelism > in parenthesis (I hope the dag appears real afterall): > > source01 -> map01(4) -> flatmap01(4) \ > > |-> keyBy -> reducer(8) > source02 -> map02(4) -> flatmap02(4) / > > And I have 4 TMs in 4 machines with 4 cores each. I would like to > place source01 and map01 and flatmap01 in TM-01. source02 and map02 > and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap > operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8 > in TM-04. > > I am using the methods "setParallelism()" and "slotSharingGroup()" to > define it but both source01 and source02 are placed in TM-01 and map01 > is split into 2 TMs. The same with map02. > > Thanks, > Felipe > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > |
Using slotSharingGroup I can do some placement. however, I am using
two different slotSharingGroup for two different sources, even though they are placed in the same TM. And this starts splitting the downstream operators in different TM as well. stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \ stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 / |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3 I am not sure which configuration I can adjust in the conf/flink-conf.yaml file to make it works. Currently, my configuration is like this bellow on the four TMs. taskmanager.numberOfTaskSlots: 4 parallelism.default: 4 Maybe if I use different numberOfTaskSlots on different TMs would it work? -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez <[hidden email]> wrote: > > because I am measuring one operator (all instances) and I want to > place its downstream operators in another machine in order to use > network channels. > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote: > > > > Hi, Felipe > > > > Flink does not support run tasks on specified TM. > > You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. > > > > Can you please give the reason for specifying TM? > > > > > > Best > > Weihua Hu > > > > 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道: > > > > For instance, if I have the following DAG with the respect parallelism > > in parenthesis (I hope the dag appears real afterall): > > > > source01 -> map01(4) -> flatmap01(4) \ > > > > |-> keyBy -> reducer(8) > > source02 -> map02(4) -> flatmap02(4) / > > > > And I have 4 TMs in 4 machines with 4 cores each. I would like to > > place source01 and map01 and flatmap01 in TM-01. source02 and map02 > > and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap > > operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8 > > in TM-04. > > > > I am using the methods "setParallelism()" and "slotSharingGroup()" to > > define it but both source01 and source02 are placed in TM-01 and map01 > > is split into 2 TMs. The same with map02. > > > > Thanks, > > Felipe > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > |
Hi, Felipe
sorry for late reply. You can try to config taskmanager.numberOfTaskSlots = 1 and use different slotSharingGroup to make sure Task do not placed in same TM. Best Weihua Hu
|
I am afraid that in the scenario that I am trying to deploy the QEP it
does not work. Let me explain. I have 4 machines with 8 cores each. I want to set the parallelism for all operators to 16. My QEP is: source(1)->map(16)->flatmap(16)-keyBy->reduce(16) So I would like to have 8 maps and 8 flatmaps in the first machine, 8 maps and 8 flatmaps in the second machine, 8 reducers in the third machine, and 8 reducers in the fourth machine. When I set taskmanager.numberOfTaskSlots = 3 I cannot have as many parallelism as I want (16). Because Each slot runs one parallel pipeline (from flink-conf.yaml). So I need 8 slots in each TM. When I use one slotSharingGroup for source, map, and flatmap, and other slotSharingGroup for the reducer, and parallelism of 16, somehow Grafana is showing to me more than 16 parallel instances of the operators. \Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Wed, Jun 3, 2020 at 2:42 PM Weihua Hu <[hidden email]> wrote: > > Hi, Felipe > > sorry for late reply. > You can try to config taskmanager.numberOfTaskSlots = 1 and use different slotSharingGroup to make sure Task do not placed in same TM. > > Best > Weihua Hu > > 2020年5月29日 17:07,Felipe Gutierrez <[hidden email]> 写道: > > Using slotSharingGroup I can do some placement. however, I am using > two different slotSharingGroup for two different sources, even though > they are placed in the same TM. And this starts splitting the > downstream operators in different TM as well. > > stream01 = source01.slot1 -> map01(4).slot1 -> flatmap01(4).slot1 \ > stream02 = source02.slot2 -> map02(4).slot2 -> flatmap02(4).slot2 / > |-> stream01.union(stream02) -> keyBy -> reducer(8).slot3 > > I am not sure which configuration I can adjust in the > conf/flink-conf.yaml file to make it works. Currently, my > configuration is like this bellow on the four TMs. > > taskmanager.numberOfTaskSlots: 4 > parallelism.default: 4 > > Maybe if I use different numberOfTaskSlots on different TMs would it work? > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Fri, May 29, 2020 at 9:00 AM Felipe Gutierrez > <[hidden email]> wrote: > > > because I am measuring one operator (all instances) and I want to > place its downstream operators in another machine in order to use > network channels. > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Fri, May 29, 2020 at 4:59 AM Weihua Hu <[hidden email]> wrote: > > > Hi, Felipe > > Flink does not support run tasks on specified TM. > You can use slotSharingGroup to control Tasks not in same Slot, but cannot specified which TM. > > Can you please give the reason for specifying TM? > > > Best > Weihua Hu > > 2020年5月28日 21:37,Felipe Gutierrez <[hidden email]> 写道: > > For instance, if I have the following DAG with the respect parallelism > in parenthesis (I hope the dag appears real afterall): > > source01 -> map01(4) -> flatmap01(4) \ > > |-> keyBy -> reducer(8) > source02 -> map02(4) -> flatmap02(4) / > > And I have 4 TMs in 4 machines with 4 cores each. I would like to > place source01 and map01 and flatmap01 in TM-01. source02 and map02 > and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap > operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8 > in TM-04. > > I am using the methods "setParallelism()" and "slotSharingGroup()" to > define it but both source01 and source02 are placed in TM-01 and map01 > is split into 2 TMs. The same with map02. > > Thanks, > Felipe > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > > |
Free forum by Nabble | Edit this page |