Hi all,
I have my own operator that extends the AbstractUdfStreamOperator class and I want to issue some messages to it. Sometimes the operator instances are deployed on different TaskManagers and I would like to set some attributes like the master and slave IPs on it. I am trying to use these values but they only return localhost, not the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: 192.168.56.1). ConfigOption<String> restAddressOption = ConfigOptions .key("rest.address") .stringType() .noDefaultValue(); System.out.println("DefaultJobManagerRunnerFactory rest.address: " + jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); System.out.println("rpcService: " + rpcService.getAddress()); Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com |
Hi Felippe, could you clarify in some more details what you are trying to achieve? Best regards, -- Alexander Fedulov | Solutions Architect +49 1514 6265796 Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: Hi all, |
Hi, Felipe
Do you mean to get the Host and Port of the task executor where your operator is indeed running on? If that is the case, IIUC, two possible components that contain this information are RuntimeContext and the Configuration param of RichFunction#open. After reading the relevant code path, it seems you could not get it at the moment. Best, Yangze Guo Best, Yangze Guo On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov <[hidden email]> wrote: > > Hi Felippe, > > could you clarify in some more details what you are trying to achieve? > > Best regards, > > -- > > Alexander Fedulov | Solutions Architect > > +49 1514 6265796 > > > > Follow us @VervericaData > > -- > > Join Flink Forward - The Apache Flink Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: >> >> Hi all, >> >> I have my own operator that extends the AbstractUdfStreamOperator >> class and I want to issue some messages to it. Sometimes the operator >> instances are deployed on different TaskManagers and I would like to >> set some attributes like the master and slave IPs on it. >> >> I am trying to use these values but they only return localhost, not >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: >> 192.168.56.1). >> >> ConfigOption<String> restAddressOption = ConfigOptions >> .key("rest.address") >> .stringType() >> .noDefaultValue(); >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); >> System.out.println("rpcService: " + rpcService.getAddress()); >> >> >> Thanks, >> Felipe >> >> -- >> -- Felipe Gutierrez >> -- skype: felipe.o.gutierrez >> -- https://felipeogutierrez.blogspot.com |
Hi all,
I would like to have the IP of the JobManager, not the Task Executors. I explain why. I have an operator (my own operator that extends AbstractUdfStreamOperator) that sends and receives messages from a global controller. So, regardless of which TaskManager these operator instances are deployed, they need to send and receive messages from my controller. Currently, I am doing this using MQTT broker (this is my first approach and I don't know if there is a better way to do it, maybe there is...) The first thing that I do is to start my controller using the org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe it to the JobManager host. I am getting the IP of the JobManager by adding this method on the org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory class: public String getRpcServiceAddress() { return this.rpcService.getAddress(); } That is working. Although I am not sure if it is the best approach. The second thing that I am doing is to make each operator instance publish and subscribe to this controller. To do this they need the JobManager IP. I could get the TaskManager IPs from the AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing the JobManager IP as a parameter to the operator at the moment. I suppose that it is easy to get the JobManager IP inside the AbstractUdfStreamOperator or simply add some method somewhere to get this value. However, I don't know where. Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > Hi, Felipe > > Do you mean to get the Host and Port of the task executor where your > operator is indeed running on? > > If that is the case, IIUC, two possible components that contain this > information are RuntimeContext and the Configuration param of > RichFunction#open. After reading the relevant code path, it seems you > could not get it at the moment. > > Best, > Yangze Guo > > Best, > Yangze Guo > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > <[hidden email]> wrote: > > > > Hi Felippe, > > > > could you clarify in some more details what you are trying to achieve? > > > > Best regards, > > > > -- > > > > Alexander Fedulov | Solutions Architect > > > > +49 1514 6265796 > > > > > > > > Follow us @VervericaData > > > > -- > > > > Join Flink Forward - The Apache Flink Conference > > > > Stream Processing | Event Driven | Real Time > > > > -- > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > -- > > > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > >> > >> Hi all, > >> > >> I have my own operator that extends the AbstractUdfStreamOperator > >> class and I want to issue some messages to it. Sometimes the operator > >> instances are deployed on different TaskManagers and I would like to > >> set some attributes like the master and slave IPs on it. > >> > >> I am trying to use these values but they only return localhost, not > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > >> 192.168.56.1). > >> > >> ConfigOption<String> restAddressOption = ConfigOptions > >> .key("rest.address") > >> .stringType() > >> .noDefaultValue(); > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > >> System.out.println("rpcService: " + rpcService.getAddress()); > >> > >> > >> Thanks, > >> Felipe > >> > >> -- > >> -- Felipe Gutierrez > >> -- skype: felipe.o.gutierrez > >> -- https://felipeogutierrez.blogspot.com |
Hi, Felipe
I see your problem. IIUC, if you use AbstractUdfStreamOperator, you could indeed get all the configurations(including what you defined in flink-conf.yaml) through "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". However, I guess it is not the right behavior and might be fixed in future versions. Best, Yangze Guo On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez <[hidden email]> wrote: > > Hi all, > > I would like to have the IP of the JobManager, not the Task Executors. > I explain why. > > I have an operator (my own operator that extends > AbstractUdfStreamOperator) that sends and receives messages from a > global controller. So, regardless of which TaskManager these operator > instances are deployed, they need to send and receive messages from my > controller. Currently, I am doing this using MQTT broker (this is my > first approach and I don't know if there is a better way to do it, > maybe there is...) > > The first thing that I do is to start my controller using the > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > it to the JobManager host. I am getting the IP of the JobManager by > adding this method on the > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > class: > public String getRpcServiceAddress() { > return this.rpcService.getAddress(); > } > That is working. Although I am not sure if it is the best approach. > > The second thing that I am doing is to make each operator instance > publish and subscribe to this controller. To do this they need the > JobManager IP. I could get the TaskManager IPs from the > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > the JobManager IP as a parameter to the operator at the moment. I > suppose that it is easy to get the JobManager IP inside the > AbstractUdfStreamOperator or simply add some method somewhere to get > this value. However, I don't know where. > > Thanks, > Felipe > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > > > Hi, Felipe > > > > Do you mean to get the Host and Port of the task executor where your > > operator is indeed running on? > > > > If that is the case, IIUC, two possible components that contain this > > information are RuntimeContext and the Configuration param of > > RichFunction#open. After reading the relevant code path, it seems you > > could not get it at the moment. > > > > Best, > > Yangze Guo > > > > Best, > > Yangze Guo > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > <[hidden email]> wrote: > > > > > > Hi Felippe, > > > > > > could you clarify in some more details what you are trying to achieve? > > > > > > Best regards, > > > > > > -- > > > > > > Alexander Fedulov | Solutions Architect > > > > > > +49 1514 6265796 > > > > > > > > > > > > Follow us @VervericaData > > > > > > -- > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > Stream Processing | Event Driven | Real Time > > > > > > -- > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > -- > > > > > > Ververica GmbH > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > > >> > > >> Hi all, > > >> > > >> I have my own operator that extends the AbstractUdfStreamOperator > > >> class and I want to issue some messages to it. Sometimes the operator > > >> instances are deployed on different TaskManagers and I would like to > > >> set some attributes like the master and slave IPs on it. > > >> > > >> I am trying to use these values but they only return localhost, not > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > >> 192.168.56.1). > > >> > > >> ConfigOption<String> restAddressOption = ConfigOptions > > >> .key("rest.address") > > >> .stringType() > > >> .noDefaultValue(); > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > >> > > >> > > >> Thanks, > > >> Felipe > > >> > > >> -- > > >> -- Felipe Gutierrez > > >> -- skype: felipe.o.gutierrez > > >> -- https://felipeogutierrez.blogspot.com |
thanks. it worked!
I add the following method at the org.apache.flink.streaming.api.operators.StreamingRuntimeContext class: public Environment getTaskEnvironment() { return this.taskEnvironment; } Then I am getting the IP using: ConfigOption<String> restAddressOption = ConfigOptions .key("rest.address") .stringType() .noDefaultValue(); String restAddress = this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption); Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote: > > Hi, Felipe > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you > could indeed get all the configurations(including what you defined in > flink-conf.yaml) through > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". > However, I guess it is not the right behavior and might be fixed in > future versions. > > Best, > Yangze Guo > > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez > <[hidden email]> wrote: > > > > Hi all, > > > > I would like to have the IP of the JobManager, not the Task Executors. > > I explain why. > > > > I have an operator (my own operator that extends > > AbstractUdfStreamOperator) that sends and receives messages from a > > global controller. So, regardless of which TaskManager these operator > > instances are deployed, they need to send and receive messages from my > > controller. Currently, I am doing this using MQTT broker (this is my > > first approach and I don't know if there is a better way to do it, > > maybe there is...) > > > > The first thing that I do is to start my controller using the > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > > it to the JobManager host. I am getting the IP of the JobManager by > > adding this method on the > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > > class: > > public String getRpcServiceAddress() { > > return this.rpcService.getAddress(); > > } > > That is working. Although I am not sure if it is the best approach. > > > > The second thing that I am doing is to make each operator instance > > publish and subscribe to this controller. To do this they need the > > JobManager IP. I could get the TaskManager IPs from the > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > > the JobManager IP as a parameter to the operator at the moment. I > > suppose that it is easy to get the JobManager IP inside the > > AbstractUdfStreamOperator or simply add some method somewhere to get > > this value. However, I don't know where. > > > > Thanks, > > Felipe > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > > > > > Hi, Felipe > > > > > > Do you mean to get the Host and Port of the task executor where your > > > operator is indeed running on? > > > > > > If that is the case, IIUC, two possible components that contain this > > > information are RuntimeContext and the Configuration param of > > > RichFunction#open. After reading the relevant code path, it seems you > > > could not get it at the moment. > > > > > > Best, > > > Yangze Guo > > > > > > Best, > > > Yangze Guo > > > > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > > <[hidden email]> wrote: > > > > > > > > Hi Felippe, > > > > > > > > could you clarify in some more details what you are trying to achieve? > > > > > > > > Best regards, > > > > > > > > -- > > > > > > > > Alexander Fedulov | Solutions Architect > > > > > > > > +49 1514 6265796 > > > > > > > > > > > > > > > > Follow us @VervericaData > > > > > > > > -- > > > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > -- > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > -- > > > > > > > > Ververica GmbH > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > > > >> > > > >> Hi all, > > > >> > > > >> I have my own operator that extends the AbstractUdfStreamOperator > > > >> class and I want to issue some messages to it. Sometimes the operator > > > >> instances are deployed on different TaskManagers and I would like to > > > >> set some attributes like the master and slave IPs on it. > > > >> > > > >> I am trying to use these values but they only return localhost, not > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > > >> 192.168.56.1). > > > >> > > > >> ConfigOption<String> restAddressOption = ConfigOptions > > > >> .key("rest.address") > > > >> .stringType() > > > >> .noDefaultValue(); > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > > >> > > > >> > > > >> Thanks, > > > >> Felipe > > > >> > > > >> -- > > > >> -- Felipe Gutierrez > > > >> -- skype: felipe.o.gutierrez > > > >> -- https://felipeogutierrez.blogspot.com |
Glad to see that!
However, I was told that it is not the right approach to directly extend `AbstractUdfStreamOperator` in DataStream API. This would be fixed at some point (maybe Flink 2.0). The JIRA link is [1]. [1] https://issues.apache.org/jira/browse/FLINK-17862 Best, Yangze Guo On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez <[hidden email]> wrote: > > thanks. it worked! > > I add the following method at the > org.apache.flink.streaming.api.operators.StreamingRuntimeContext > class: > > public Environment getTaskEnvironment() { return this.taskEnvironment; } > > Then I am getting the IP using: > > ConfigOption<String> restAddressOption = ConfigOptions > .key("rest.address") > .stringType() > .noDefaultValue(); > String restAddress = > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption); > > Thanks! > > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote: > > > > Hi, Felipe > > > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you > > could indeed get all the configurations(including what you defined in > > flink-conf.yaml) through > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". > > However, I guess it is not the right behavior and might be fixed in > > future versions. > > > > Best, > > Yangze Guo > > > > > > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez > > <[hidden email]> wrote: > > > > > > Hi all, > > > > > > I would like to have the IP of the JobManager, not the Task Executors. > > > I explain why. > > > > > > I have an operator (my own operator that extends > > > AbstractUdfStreamOperator) that sends and receives messages from a > > > global controller. So, regardless of which TaskManager these operator > > > instances are deployed, they need to send and receive messages from my > > > controller. Currently, I am doing this using MQTT broker (this is my > > > first approach and I don't know if there is a better way to do it, > > > maybe there is...) > > > > > > The first thing that I do is to start my controller using the > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > > > it to the JobManager host. I am getting the IP of the JobManager by > > > adding this method on the > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > > > class: > > > public String getRpcServiceAddress() { > > > return this.rpcService.getAddress(); > > > } > > > That is working. Although I am not sure if it is the best approach. > > > > > > The second thing that I am doing is to make each operator instance > > > publish and subscribe to this controller. To do this they need the > > > JobManager IP. I could get the TaskManager IPs from the > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > > > the JobManager IP as a parameter to the operator at the moment. I > > > suppose that it is easy to get the JobManager IP inside the > > > AbstractUdfStreamOperator or simply add some method somewhere to get > > > this value. However, I don't know where. > > > > > > Thanks, > > > Felipe > > > > > > -- > > > -- Felipe Gutierrez > > > -- skype: felipe.o.gutierrez > > > -- https://felipeogutierrez.blogspot.com > > > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > > > > > > > Hi, Felipe > > > > > > > > Do you mean to get the Host and Port of the task executor where your > > > > operator is indeed running on? > > > > > > > > If that is the case, IIUC, two possible components that contain this > > > > information are RuntimeContext and the Configuration param of > > > > RichFunction#open. After reading the relevant code path, it seems you > > > > could not get it at the moment. > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > > > <[hidden email]> wrote: > > > > > > > > > > Hi Felippe, > > > > > > > > > > could you clarify in some more details what you are trying to achieve? > > > > > > > > > > Best regards, > > > > > > > > > > -- > > > > > > > > > > Alexander Fedulov | Solutions Architect > > > > > > > > > > +49 1514 6265796 > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData > > > > > > > > > > -- > > > > > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > -- > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > -- > > > > > > > > > > Ververica GmbH > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > > > > >> > > > > >> Hi all, > > > > >> > > > > >> I have my own operator that extends the AbstractUdfStreamOperator > > > > >> class and I want to issue some messages to it. Sometimes the operator > > > > >> instances are deployed on different TaskManagers and I would like to > > > > >> set some attributes like the master and slave IPs on it. > > > > >> > > > > >> I am trying to use these values but they only return localhost, not > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > > > >> 192.168.56.1). > > > > >> > > > > >> ConfigOption<String> restAddressOption = ConfigOptions > > > > >> .key("rest.address") > > > > >> .stringType() > > > > >> .noDefaultValue(); > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > > > >> > > > > >> > > > > >> Thanks, > > > > >> Felipe > > > > >> > > > > >> -- > > > > >> -- Felipe Gutierrez > > > > >> -- skype: felipe.o.gutierrez > > > > >> -- https://felipeogutierrez.blogspot.com |
ok, I see.
Do you suggest a better approach to send messages from the JobManager to the TaskManagers and my specific operator? Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Mon, May 25, 2020 at 4:23 AM Yangze Guo <[hidden email]> wrote: > > Glad to see that! > > However, I was told that it is not the right approach to directly > extend `AbstractUdfStreamOperator` in DataStream API. This would be > fixed at some point (maybe Flink 2.0). The JIRA link is [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-17862 > > Best, > Yangze Guo > > On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez > <[hidden email]> wrote: > > > > thanks. it worked! > > > > I add the following method at the > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext > > class: > > > > public Environment getTaskEnvironment() { return this.taskEnvironment; } > > > > Then I am getting the IP using: > > > > ConfigOption<String> restAddressOption = ConfigOptions > > .key("rest.address") > > .stringType() > > .noDefaultValue(); > > String restAddress = > > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption); > > > > Thanks! > > > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com > > > > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote: > > > > > > Hi, Felipe > > > > > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you > > > could indeed get all the configurations(including what you defined in > > > flink-conf.yaml) through > > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". > > > However, I guess it is not the right behavior and might be fixed in > > > future versions. > > > > > > Best, > > > Yangze Guo > > > > > > > > > > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez > > > <[hidden email]> wrote: > > > > > > > > Hi all, > > > > > > > > I would like to have the IP of the JobManager, not the Task Executors. > > > > I explain why. > > > > > > > > I have an operator (my own operator that extends > > > > AbstractUdfStreamOperator) that sends and receives messages from a > > > > global controller. So, regardless of which TaskManager these operator > > > > instances are deployed, they need to send and receive messages from my > > > > controller. Currently, I am doing this using MQTT broker (this is my > > > > first approach and I don't know if there is a better way to do it, > > > > maybe there is...) > > > > > > > > The first thing that I do is to start my controller using the > > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > > > > it to the JobManager host. I am getting the IP of the JobManager by > > > > adding this method on the > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > > > > class: > > > > public String getRpcServiceAddress() { > > > > return this.rpcService.getAddress(); > > > > } > > > > That is working. Although I am not sure if it is the best approach. > > > > > > > > The second thing that I am doing is to make each operator instance > > > > publish and subscribe to this controller. To do this they need the > > > > JobManager IP. I could get the TaskManager IPs from the > > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > > > > the JobManager IP as a parameter to the operator at the moment. I > > > > suppose that it is easy to get the JobManager IP inside the > > > > AbstractUdfStreamOperator or simply add some method somewhere to get > > > > this value. However, I don't know where. > > > > > > > > Thanks, > > > > Felipe > > > > > > > > -- > > > > -- Felipe Gutierrez > > > > -- skype: felipe.o.gutierrez > > > > -- https://felipeogutierrez.blogspot.com > > > > > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > > > > > > > > > Hi, Felipe > > > > > > > > > > Do you mean to get the Host and Port of the task executor where your > > > > > operator is indeed running on? > > > > > > > > > > If that is the case, IIUC, two possible components that contain this > > > > > information are RuntimeContext and the Configuration param of > > > > > RichFunction#open. After reading the relevant code path, it seems you > > > > > could not get it at the moment. > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > > > > <[hidden email]> wrote: > > > > > > > > > > > > Hi Felippe, > > > > > > > > > > > > could you clarify in some more details what you are trying to achieve? > > > > > > > > > > > > Best regards, > > > > > > > > > > > > -- > > > > > > > > > > > > Alexander Fedulov | Solutions Architect > > > > > > > > > > > > +49 1514 6265796 > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData > > > > > > > > > > > > -- > > > > > > > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > -- > > > > > > > > > > > > Ververica GmbH > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > > > > > >> > > > > > >> Hi all, > > > > > >> > > > > > >> I have my own operator that extends the AbstractUdfStreamOperator > > > > > >> class and I want to issue some messages to it. Sometimes the operator > > > > > >> instances are deployed on different TaskManagers and I would like to > > > > > >> set some attributes like the master and slave IPs on it. > > > > > >> > > > > > >> I am trying to use these values but they only return localhost, not > > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > > > > >> 192.168.56.1). > > > > > >> > > > > > >> ConfigOption<String> restAddressOption = ConfigOptions > > > > > >> .key("rest.address") > > > > > >> .stringType() > > > > > >> .noDefaultValue(); > > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > > > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > > > > >> > > > > > >> > > > > > >> Thanks, > > > > > >> Felipe > > > > > >> > > > > > >> -- > > > > > >> -- Felipe Gutierrez > > > > > >> -- skype: felipe.o.gutierrez > > > > > >> -- https://felipeogutierrez.blogspot.com |
I'm not quite familiar with that. I'd like to cc @Aljoscha Krettek here.
Best, Yangze Guo On Mon, May 25, 2020 at 4:39 PM Felipe Gutierrez <[hidden email]> wrote: > > ok, I see. > > Do you suggest a better approach to send messages from the JobManager > to the TaskManagers and my specific operator? > > Thanks, > Felipe > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Mon, May 25, 2020 at 4:23 AM Yangze Guo <[hidden email]> wrote: > > > > Glad to see that! > > > > However, I was told that it is not the right approach to directly > > extend `AbstractUdfStreamOperator` in DataStream API. This would be > > fixed at some point (maybe Flink 2.0). The JIRA link is [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-17862 > > > > Best, > > Yangze Guo > > > > On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez > > <[hidden email]> wrote: > > > > > > thanks. it worked! > > > > > > I add the following method at the > > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext > > > class: > > > > > > public Environment getTaskEnvironment() { return this.taskEnvironment; } > > > > > > Then I am getting the IP using: > > > > > > ConfigOption<String> restAddressOption = ConfigOptions > > > .key("rest.address") > > > .stringType() > > > .noDefaultValue(); > > > String restAddress = > > > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption); > > > > > > Thanks! > > > > > > -- > > > -- Felipe Gutierrez > > > -- skype: felipe.o.gutierrez > > > -- https://felipeogutierrez.blogspot.com > > > > > > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote: > > > > > > > > Hi, Felipe > > > > > > > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you > > > > could indeed get all the configurations(including what you defined in > > > > flink-conf.yaml) through > > > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()". > > > > However, I guess it is not the right behavior and might be fixed in > > > > future versions. > > > > > > > > Best, > > > > Yangze Guo > > > > > > > > > > > > > > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez > > > > <[hidden email]> wrote: > > > > > > > > > > Hi all, > > > > > > > > > > I would like to have the IP of the JobManager, not the Task Executors. > > > > > I explain why. > > > > > > > > > > I have an operator (my own operator that extends > > > > > AbstractUdfStreamOperator) that sends and receives messages from a > > > > > global controller. So, regardless of which TaskManager these operator > > > > > instances are deployed, they need to send and receive messages from my > > > > > controller. Currently, I am doing this using MQTT broker (this is my > > > > > first approach and I don't know if there is a better way to do it, > > > > > maybe there is...) > > > > > > > > > > The first thing that I do is to start my controller using the > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe > > > > > it to the JobManager host. I am getting the IP of the JobManager by > > > > > adding this method on the > > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory > > > > > class: > > > > > public String getRpcServiceAddress() { > > > > > return this.rpcService.getAddress(); > > > > > } > > > > > That is working. Although I am not sure if it is the best approach. > > > > > > > > > > The second thing that I am doing is to make each operator instance > > > > > publish and subscribe to this controller. To do this they need the > > > > > JobManager IP. I could get the TaskManager IPs from the > > > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing > > > > > the JobManager IP as a parameter to the operator at the moment. I > > > > > suppose that it is easy to get the JobManager IP inside the > > > > > AbstractUdfStreamOperator or simply add some method somewhere to get > > > > > this value. However, I don't know where. > > > > > > > > > > Thanks, > > > > > Felipe > > > > > > > > > > -- > > > > > -- Felipe Gutierrez > > > > > -- skype: felipe.o.gutierrez > > > > > -- https://felipeogutierrez.blogspot.com > > > > > > > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote: > > > > > > > > > > > > Hi, Felipe > > > > > > > > > > > > Do you mean to get the Host and Port of the task executor where your > > > > > > operator is indeed running on? > > > > > > > > > > > > If that is the case, IIUC, two possible components that contain this > > > > > > information are RuntimeContext and the Configuration param of > > > > > > RichFunction#open. After reading the relevant code path, it seems you > > > > > > could not get it at the moment. > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > Best, > > > > > > Yangze Guo > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov > > > > > > <[hidden email]> wrote: > > > > > > > > > > > > > > Hi Felippe, > > > > > > > > > > > > > > could you clarify in some more details what you are trying to achieve? > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Alexander Fedulov | Solutions Architect > > > > > > > > > > > > > > +49 1514 6265796 > > > > > > > > > > > > > > > > > > > > > > > > > > > > Follow us @VervericaData > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Join Flink Forward - The Apache Flink Conference > > > > > > > > > > > > > > Stream Processing | Event Driven | Real Time > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Ververica GmbH > > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote: > > > > > > >> > > > > > > >> Hi all, > > > > > > >> > > > > > > >> I have my own operator that extends the AbstractUdfStreamOperator > > > > > > >> class and I want to issue some messages to it. Sometimes the operator > > > > > > >> instances are deployed on different TaskManagers and I would like to > > > > > > >> set some attributes like the master and slave IPs on it. > > > > > > >> > > > > > > >> I am trying to use these values but they only return localhost, not > > > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address: > > > > > > >> 192.168.56.1). > > > > > > >> > > > > > > >> ConfigOption<String> restAddressOption = ConfigOptions > > > > > > >> .key("rest.address") > > > > > > >> .stringType() > > > > > > >> .noDefaultValue(); > > > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " + > > > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption)); > > > > > > >> System.out.println("rpcService: " + rpcService.getAddress()); > > > > > > >> > > > > > > >> > > > > > > >> Thanks, > > > > > > >> Felipe > > > > > > >> > > > > > > >> -- > > > > > > >> -- Felipe Gutierrez > > > > > > >> -- skype: felipe.o.gutierrez > > > > > > >> -- https://felipeogutierrez.blogspot.com |
Free forum by Nabble | Edit this page |