How do I get the IP of the master and slave files programmatically in Flink?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How do I get the IP of the master and slave files programmatically in Flink?

Felipe Gutierrez
Hi all,

I have my own operator that extends the AbstractUdfStreamOperator
class and I want to issue some messages to it. Sometimes the operator
instances are deployed on different TaskManagers and I would like to
set some attributes like the master and slave IPs on it.

I am trying to use these values but they only return localhost, not
the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
192.168.56.1).

ConfigOption<String> restAddressOption = ConfigOptions
   .key("rest.address")
   .stringType()
   .noDefaultValue();
System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
System.out.println("rpcService: " + rpcService.getAddress());


Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Alexander Fedulov
Hi Felippe, 

could you clarify in some more details what you are trying to achieve?

Best regards,

--

Alexander Fedulov | Solutions Architect

+49 1514 6265796



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng




On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
Hi all,

I have my own operator that extends the AbstractUdfStreamOperator
class and I want to issue some messages to it. Sometimes the operator
instances are deployed on different TaskManagers and I would like to
set some attributes like the master and slave IPs on it.

I am trying to use these values but they only return localhost, not
the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
192.168.56.1).

ConfigOption<String> restAddressOption = ConfigOptions
   .key("rest.address")
   .stringType()
   .noDefaultValue();
System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
System.out.println("rpcService: " + rpcService.getAddress());


Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Yangze Guo
Hi, Felipe

Do you mean to get the Host and Port of the task executor where your
operator is indeed running on?

If that is the case, IIUC, two possible components that contain this
information are RuntimeContext and the Configuration param of
RichFunction#open. After reading the relevant code path, it seems you
could not get it at the moment.

Best,
Yangze Guo

Best,
Yangze Guo


On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
<[hidden email]> wrote:

>
> Hi Felippe,
>
> could you clarify in some more details what you are trying to achieve?
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
>
>
>
>
> On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
>>
>> Hi all,
>>
>> I have my own operator that extends the AbstractUdfStreamOperator
>> class and I want to issue some messages to it. Sometimes the operator
>> instances are deployed on different TaskManagers and I would like to
>> set some attributes like the master and slave IPs on it.
>>
>> I am trying to use these values but they only return localhost, not
>> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
>> 192.168.56.1).
>>
>> ConfigOption<String> restAddressOption = ConfigOptions
>>    .key("rest.address")
>>    .stringType()
>>    .noDefaultValue();
>> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
>> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
>> System.out.println("rpcService: " + rpcService.getAddress());
>>
>>
>> Thanks,
>> Felipe
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Felipe Gutierrez
Hi all,

I would like to have the IP of the JobManager, not the Task Executors.
I explain why.

I have an operator (my own operator that extends
AbstractUdfStreamOperator) that sends and receives messages from a
global controller. So, regardless of which TaskManager these operator
instances are deployed, they need to send and receive messages from my
controller. Currently, I am doing this using MQTT broker (this is my
first approach and I don't know if there is a better way to do it,
maybe there is...)

The first thing that I do is to start my controller using the
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
it to the JobManager host. I am getting the IP of the JobManager by
adding this method on the
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
class:
       public String getRpcServiceAddress() {
return this.rpcService.getAddress();
}
That is working. Although I am not sure if it is the best approach.

The second thing that I am doing is to make each operator instance
publish and subscribe to this controller. To do this they need the
JobManager IP. I could get the TaskManager IPs from the
AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
the JobManager IP as a parameter to the operator at the moment. I
suppose that it is easy to get the JobManager IP inside the
AbstractUdfStreamOperator or simply add some method somewhere to get
this value. However, I don't know where.

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:

>
> Hi, Felipe
>
> Do you mean to get the Host and Port of the task executor where your
> operator is indeed running on?
>
> If that is the case, IIUC, two possible components that contain this
> information are RuntimeContext and the Configuration param of
> RichFunction#open. After reading the relevant code path, it seems you
> could not get it at the moment.
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> <[hidden email]> wrote:
> >
> > Hi Felippe,
> >
> > could you clarify in some more details what you are trying to achieve?
> >
> > Best regards,
> >
> > --
> >
> > Alexander Fedulov | Solutions Architect
> >
> > +49 1514 6265796
> >
> >
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward - The Apache Flink Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> >
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> >
> >
> >
> >
> > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> >>
> >> Hi all,
> >>
> >> I have my own operator that extends the AbstractUdfStreamOperator
> >> class and I want to issue some messages to it. Sometimes the operator
> >> instances are deployed on different TaskManagers and I would like to
> >> set some attributes like the master and slave IPs on it.
> >>
> >> I am trying to use these values but they only return localhost, not
> >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> >> 192.168.56.1).
> >>
> >> ConfigOption<String> restAddressOption = ConfigOptions
> >>    .key("rest.address")
> >>    .stringType()
> >>    .noDefaultValue();
> >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> >> System.out.println("rpcService: " + rpcService.getAddress());
> >>
> >>
> >> Thanks,
> >> Felipe
> >>
> >> --
> >> -- Felipe Gutierrez
> >> -- skype: felipe.o.gutierrez
> >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Yangze Guo
Hi, Felipe

I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
could indeed get all the configurations(including what you defined in
flink-conf.yaml) through
"AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
However, I guess it is not the right behavior and might be fixed in
future versions.

Best,
Yangze Guo



On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> Hi all,
>
> I would like to have the IP of the JobManager, not the Task Executors.
> I explain why.
>
> I have an operator (my own operator that extends
> AbstractUdfStreamOperator) that sends and receives messages from a
> global controller. So, regardless of which TaskManager these operator
> instances are deployed, they need to send and receive messages from my
> controller. Currently, I am doing this using MQTT broker (this is my
> first approach and I don't know if there is a better way to do it,
> maybe there is...)
>
> The first thing that I do is to start my controller using the
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> it to the JobManager host. I am getting the IP of the JobManager by
> adding this method on the
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> class:
>        public String getRpcServiceAddress() {
> return this.rpcService.getAddress();
> }
> That is working. Although I am not sure if it is the best approach.
>
> The second thing that I am doing is to make each operator instance
> publish and subscribe to this controller. To do this they need the
> JobManager IP. I could get the TaskManager IPs from the
> AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> the JobManager IP as a parameter to the operator at the moment. I
> suppose that it is easy to get the JobManager IP inside the
> AbstractUdfStreamOperator or simply add some method somewhere to get
> this value. However, I don't know where.
>
> Thanks,
> Felipe
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:
> >
> > Hi, Felipe
> >
> > Do you mean to get the Host and Port of the task executor where your
> > operator is indeed running on?
> >
> > If that is the case, IIUC, two possible components that contain this
> > information are RuntimeContext and the Configuration param of
> > RichFunction#open. After reading the relevant code path, it seems you
> > could not get it at the moment.
> >
> > Best,
> > Yangze Guo
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > <[hidden email]> wrote:
> > >
> > > Hi Felippe,
> > >
> > > could you clarify in some more details what you are trying to achieve?
> > >
> > > Best regards,
> > >
> > > --
> > >
> > > Alexander Fedulov | Solutions Architect
> > >
> > > +49 1514 6265796
> > >
> > >
> > >
> > > Follow us @VervericaData
> > >
> > > --
> > >
> > > Join Flink Forward - The Apache Flink Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> > > --
> > >
> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >
> > > --
> > >
> > > Ververica GmbH
> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> > >
> > >
> > >
> > >
> > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> > >>
> > >> Hi all,
> > >>
> > >> I have my own operator that extends the AbstractUdfStreamOperator
> > >> class and I want to issue some messages to it. Sometimes the operator
> > >> instances are deployed on different TaskManagers and I would like to
> > >> set some attributes like the master and slave IPs on it.
> > >>
> > >> I am trying to use these values but they only return localhost, not
> > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > >> 192.168.56.1).
> > >>
> > >> ConfigOption<String> restAddressOption = ConfigOptions
> > >>    .key("rest.address")
> > >>    .stringType()
> > >>    .noDefaultValue();
> > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > >> System.out.println("rpcService: " + rpcService.getAddress());
> > >>
> > >>
> > >> Thanks,
> > >> Felipe
> > >>
> > >> --
> > >> -- Felipe Gutierrez
> > >> -- skype: felipe.o.gutierrez
> > >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Felipe Gutierrez
thanks. it worked!

I add the following method at the
org.apache.flink.streaming.api.operators.StreamingRuntimeContext
class:

public Environment getTaskEnvironment() { return this.taskEnvironment; }

Then I am getting the IP using:

ConfigOption<String> restAddressOption = ConfigOptions
   .key("rest.address")
   .stringType()
   .noDefaultValue();
String restAddress =
this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);

Thanks!

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote:

>
> Hi, Felipe
>
> I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> could indeed get all the configurations(including what you defined in
> flink-conf.yaml) through
> "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> However, I guess it is not the right behavior and might be fixed in
> future versions.
>
> Best,
> Yangze Guo
>
>
>
> On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> <[hidden email]> wrote:
> >
> > Hi all,
> >
> > I would like to have the IP of the JobManager, not the Task Executors.
> > I explain why.
> >
> > I have an operator (my own operator that extends
> > AbstractUdfStreamOperator) that sends and receives messages from a
> > global controller. So, regardless of which TaskManager these operator
> > instances are deployed, they need to send and receive messages from my
> > controller. Currently, I am doing this using MQTT broker (this is my
> > first approach and I don't know if there is a better way to do it,
> > maybe there is...)
> >
> > The first thing that I do is to start my controller using the
> > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > it to the JobManager host. I am getting the IP of the JobManager by
> > adding this method on the
> > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > class:
> >        public String getRpcServiceAddress() {
> > return this.rpcService.getAddress();
> > }
> > That is working. Although I am not sure if it is the best approach.
> >
> > The second thing that I am doing is to make each operator instance
> > publish and subscribe to this controller. To do this they need the
> > JobManager IP. I could get the TaskManager IPs from the
> > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > the JobManager IP as a parameter to the operator at the moment. I
> > suppose that it is easy to get the JobManager IP inside the
> > AbstractUdfStreamOperator or simply add some method somewhere to get
> > this value. However, I don't know where.
> >
> > Thanks,
> > Felipe
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:
> > >
> > > Hi, Felipe
> > >
> > > Do you mean to get the Host and Port of the task executor where your
> > > operator is indeed running on?
> > >
> > > If that is the case, IIUC, two possible components that contain this
> > > information are RuntimeContext and the Configuration param of
> > > RichFunction#open. After reading the relevant code path, it seems you
> > > could not get it at the moment.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > <[hidden email]> wrote:
> > > >
> > > > Hi Felippe,
> > > >
> > > > could you clarify in some more details what you are trying to achieve?
> > > >
> > > > Best regards,
> > > >
> > > > --
> > > >
> > > > Alexander Fedulov | Solutions Architect
> > > >
> > > > +49 1514 6265796
> > > >
> > > >
> > > >
> > > > Follow us @VervericaData
> > > >
> > > > --
> > > >
> > > > Join Flink Forward - The Apache Flink Conference
> > > >
> > > > Stream Processing | Event Driven | Real Time
> > > >
> > > > --
> > > >
> > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > >
> > > > --
> > > >
> > > > Ververica GmbH
> > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > >> class and I want to issue some messages to it. Sometimes the operator
> > > >> instances are deployed on different TaskManagers and I would like to
> > > >> set some attributes like the master and slave IPs on it.
> > > >>
> > > >> I am trying to use these values but they only return localhost, not
> > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > >> 192.168.56.1).
> > > >>
> > > >> ConfigOption<String> restAddressOption = ConfigOptions
> > > >>    .key("rest.address")
> > > >>    .stringType()
> > > >>    .noDefaultValue();
> > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > > >> System.out.println("rpcService: " + rpcService.getAddress());
> > > >>
> > > >>
> > > >> Thanks,
> > > >> Felipe
> > > >>
> > > >> --
> > > >> -- Felipe Gutierrez
> > > >> -- skype: felipe.o.gutierrez
> > > >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Yangze Guo
Glad to see that!

However, I was told that it is not the right approach to directly
extend `AbstractUdfStreamOperator` in DataStream API. This would be
fixed at some point (maybe Flink 2.0). The JIRA link is [1].

[1] https://issues.apache.org/jira/browse/FLINK-17862

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> thanks. it worked!
>
> I add the following method at the
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> class:
>
> public Environment getTaskEnvironment() { return this.taskEnvironment; }
>
> Then I am getting the IP using:
>
> ConfigOption<String> restAddressOption = ConfigOptions
>    .key("rest.address")
>    .stringType()
>    .noDefaultValue();
> String restAddress =
> this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);
>
> Thanks!
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote:
> >
> > Hi, Felipe
> >
> > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> > could indeed get all the configurations(including what you defined in
> > flink-conf.yaml) through
> > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> > However, I guess it is not the right behavior and might be fixed in
> > future versions.
> >
> > Best,
> > Yangze Guo
> >
> >
> >
> > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> > <[hidden email]> wrote:
> > >
> > > Hi all,
> > >
> > > I would like to have the IP of the JobManager, not the Task Executors.
> > > I explain why.
> > >
> > > I have an operator (my own operator that extends
> > > AbstractUdfStreamOperator) that sends and receives messages from a
> > > global controller. So, regardless of which TaskManager these operator
> > > instances are deployed, they need to send and receive messages from my
> > > controller. Currently, I am doing this using MQTT broker (this is my
> > > first approach and I don't know if there is a better way to do it,
> > > maybe there is...)
> > >
> > > The first thing that I do is to start my controller using the
> > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > > it to the JobManager host. I am getting the IP of the JobManager by
> > > adding this method on the
> > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > > class:
> > >        public String getRpcServiceAddress() {
> > > return this.rpcService.getAddress();
> > > }
> > > That is working. Although I am not sure if it is the best approach.
> > >
> > > The second thing that I am doing is to make each operator instance
> > > publish and subscribe to this controller. To do this they need the
> > > JobManager IP. I could get the TaskManager IPs from the
> > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > > the JobManager IP as a parameter to the operator at the moment. I
> > > suppose that it is easy to get the JobManager IP inside the
> > > AbstractUdfStreamOperator or simply add some method somewhere to get
> > > this value. However, I don't know where.
> > >
> > > Thanks,
> > > Felipe
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:
> > > >
> > > > Hi, Felipe
> > > >
> > > > Do you mean to get the Host and Port of the task executor where your
> > > > operator is indeed running on?
> > > >
> > > > If that is the case, IIUC, two possible components that contain this
> > > > information are RuntimeContext and the Configuration param of
> > > > RichFunction#open. After reading the relevant code path, it seems you
> > > > could not get it at the moment.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > > <[hidden email]> wrote:
> > > > >
> > > > > Hi Felippe,
> > > > >
> > > > > could you clarify in some more details what you are trying to achieve?
> > > > >
> > > > > Best regards,
> > > > >
> > > > > --
> > > > >
> > > > > Alexander Fedulov | Solutions Architect
> > > > >
> > > > > +49 1514 6265796
> > > > >
> > > > >
> > > > >
> > > > > Follow us @VervericaData
> > > > >
> > > > > --
> > > > >
> > > > > Join Flink Forward - The Apache Flink Conference
> > > > >
> > > > > Stream Processing | Event Driven | Real Time
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > >
> > > > > --
> > > > >
> > > > > Ververica GmbH
> > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> > > > >>
> > > > >> Hi all,
> > > > >>
> > > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > > >> class and I want to issue some messages to it. Sometimes the operator
> > > > >> instances are deployed on different TaskManagers and I would like to
> > > > >> set some attributes like the master and slave IPs on it.
> > > > >>
> > > > >> I am trying to use these values but they only return localhost, not
> > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > > >> 192.168.56.1).
> > > > >>
> > > > >> ConfigOption<String> restAddressOption = ConfigOptions
> > > > >>    .key("rest.address")
> > > > >>    .stringType()
> > > > >>    .noDefaultValue();
> > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > > > >> System.out.println("rpcService: " + rpcService.getAddress());
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >> Felipe
> > > > >>
> > > > >> --
> > > > >> -- Felipe Gutierrez
> > > > >> -- skype: felipe.o.gutierrez
> > > > >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Felipe Gutierrez
ok, I see.

Do you suggest a better approach to send messages from the JobManager
to the TaskManagers and my specific operator?

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, May 25, 2020 at 4:23 AM Yangze Guo <[hidden email]> wrote:

>
> Glad to see that!
>
> However, I was told that it is not the right approach to directly
> extend `AbstractUdfStreamOperator` in DataStream API. This would be
> fixed at some point (maybe Flink 2.0). The JIRA link is [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-17862
>
> Best,
> Yangze Guo
>
> On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez
> <[hidden email]> wrote:
> >
> > thanks. it worked!
> >
> > I add the following method at the
> > org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> > class:
> >
> > public Environment getTaskEnvironment() { return this.taskEnvironment; }
> >
> > Then I am getting the IP using:
> >
> > ConfigOption<String> restAddressOption = ConfigOptions
> >    .key("rest.address")
> >    .stringType()
> >    .noDefaultValue();
> > String restAddress =
> > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);
> >
> > Thanks!
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote:
> > >
> > > Hi, Felipe
> > >
> > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> > > could indeed get all the configurations(including what you defined in
> > > flink-conf.yaml) through
> > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> > > However, I guess it is not the right behavior and might be fixed in
> > > future versions.
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > >
> > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> > > <[hidden email]> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I would like to have the IP of the JobManager, not the Task Executors.
> > > > I explain why.
> > > >
> > > > I have an operator (my own operator that extends
> > > > AbstractUdfStreamOperator) that sends and receives messages from a
> > > > global controller. So, regardless of which TaskManager these operator
> > > > instances are deployed, they need to send and receive messages from my
> > > > controller. Currently, I am doing this using MQTT broker (this is my
> > > > first approach and I don't know if there is a better way to do it,
> > > > maybe there is...)
> > > >
> > > > The first thing that I do is to start my controller using the
> > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > > > it to the JobManager host. I am getting the IP of the JobManager by
> > > > adding this method on the
> > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > > > class:
> > > >        public String getRpcServiceAddress() {
> > > > return this.rpcService.getAddress();
> > > > }
> > > > That is working. Although I am not sure if it is the best approach.
> > > >
> > > > The second thing that I am doing is to make each operator instance
> > > > publish and subscribe to this controller. To do this they need the
> > > > JobManager IP. I could get the TaskManager IPs from the
> > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > > > the JobManager IP as a parameter to the operator at the moment. I
> > > > suppose that it is easy to get the JobManager IP inside the
> > > > AbstractUdfStreamOperator or simply add some method somewhere to get
> > > > this value. However, I don't know where.
> > > >
> > > > Thanks,
> > > > Felipe
> > > >
> > > > --
> > > > -- Felipe Gutierrez
> > > > -- skype: felipe.o.gutierrez
> > > > -- https://felipeogutierrez.blogspot.com
> > > >
> > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:
> > > > >
> > > > > Hi, Felipe
> > > > >
> > > > > Do you mean to get the Host and Port of the task executor where your
> > > > > operator is indeed running on?
> > > > >
> > > > > If that is the case, IIUC, two possible components that contain this
> > > > > information are RuntimeContext and the Configuration param of
> > > > > RichFunction#open. After reading the relevant code path, it seems you
> > > > > could not get it at the moment.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > >
> > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > > > <[hidden email]> wrote:
> > > > > >
> > > > > > Hi Felippe,
> > > > > >
> > > > > > could you clarify in some more details what you are trying to achieve?
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Alexander Fedulov | Solutions Architect
> > > > > >
> > > > > > +49 1514 6265796
> > > > > >
> > > > > >
> > > > > >
> > > > > > Follow us @VervericaData
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Join Flink Forward - The Apache Flink Conference
> > > > > >
> > > > > > Stream Processing | Event Driven | Real Time
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Ververica GmbH
> > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> > > > > >>
> > > > > >> Hi all,
> > > > > >>
> > > > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > > > >> class and I want to issue some messages to it. Sometimes the operator
> > > > > >> instances are deployed on different TaskManagers and I would like to
> > > > > >> set some attributes like the master and slave IPs on it.
> > > > > >>
> > > > > >> I am trying to use these values but they only return localhost, not
> > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > > > >> 192.168.56.1).
> > > > > >>
> > > > > >> ConfigOption<String> restAddressOption = ConfigOptions
> > > > > >>    .key("rest.address")
> > > > > >>    .stringType()
> > > > > >>    .noDefaultValue();
> > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > > > > >> System.out.println("rpcService: " + rpcService.getAddress());
> > > > > >>
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Felipe
> > > > > >>
> > > > > >> --
> > > > > >> -- Felipe Gutierrez
> > > > > >> -- skype: felipe.o.gutierrez
> > > > > >> -- https://felipeogutierrez.blogspot.com
Reply | Threaded
Open this post in threaded view
|

Re: How do I get the IP of the master and slave files programmatically in Flink?

Yangze Guo
I'm not quite familiar with that. I'd like to cc @Aljoscha Krettek here.


Best,
Yangze Guo

On Mon, May 25, 2020 at 4:39 PM Felipe Gutierrez
<[hidden email]> wrote:

>
> ok, I see.
>
> Do you suggest a better approach to send messages from the JobManager
> to the TaskManagers and my specific operator?
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Mon, May 25, 2020 at 4:23 AM Yangze Guo <[hidden email]> wrote:
> >
> > Glad to see that!
> >
> > However, I was told that it is not the right approach to directly
> > extend `AbstractUdfStreamOperator` in DataStream API. This would be
> > fixed at some point (maybe Flink 2.0). The JIRA link is [1].
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17862
> >
> > Best,
> > Yangze Guo
> >
> > On Fri, May 22, 2020 at 9:56 PM Felipe Gutierrez
> > <[hidden email]> wrote:
> > >
> > > thanks. it worked!
> > >
> > > I add the following method at the
> > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext
> > > class:
> > >
> > > public Environment getTaskEnvironment() { return this.taskEnvironment; }
> > >
> > > Then I am getting the IP using:
> > >
> > > ConfigOption<String> restAddressOption = ConfigOptions
> > >    .key("rest.address")
> > >    .stringType()
> > >    .noDefaultValue();
> > > String restAddress =
> > > this.getRuntimeContext().getTaskEnvironment().getTaskManagerInfo().getConfiguration().getValue(restAddressOption);
> > >
> > > Thanks!
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Fri, May 22, 2020 at 3:54 AM Yangze Guo <[hidden email]> wrote:
> > > >
> > > > Hi, Felipe
> > > >
> > > > I see your problem. IIUC, if you use AbstractUdfStreamOperator, you
> > > > could indeed get all the configurations(including what you defined in
> > > > flink-conf.yaml) through
> > > > "AbstractUdfStreamOperator#getRuntimeContext().getTaskManagerRuntimeInfo().getConfiguration()".
> > > > However, I guess it is not the right behavior and might be fixed in
> > > > future versions.
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > >
> > > >
> > > > On Thu, May 21, 2020 at 3:13 PM Felipe Gutierrez
> > > > <[hidden email]> wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to have the IP of the JobManager, not the Task Executors.
> > > > > I explain why.
> > > > >
> > > > > I have an operator (my own operator that extends
> > > > > AbstractUdfStreamOperator) that sends and receives messages from a
> > > > > global controller. So, regardless of which TaskManager these operator
> > > > > instances are deployed, they need to send and receive messages from my
> > > > > controller. Currently, I am doing this using MQTT broker (this is my
> > > > > first approach and I don't know if there is a better way to do it,
> > > > > maybe there is...)
> > > > >
> > > > > The first thing that I do is to start my controller using the
> > > > > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl and subscribe
> > > > > it to the JobManager host. I am getting the IP of the JobManager by
> > > > > adding this method on the
> > > > > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory
> > > > > class:
> > > > >        public String getRpcServiceAddress() {
> > > > > return this.rpcService.getAddress();
> > > > > }
> > > > > That is working. Although I am not sure if it is the best approach.
> > > > >
> > > > > The second thing that I am doing is to make each operator instance
> > > > > publish and subscribe to this controller. To do this they need the
> > > > > JobManager IP. I could get the TaskManager IPs from the
> > > > > AbstractUdfStreamOperator, but not the JobManager IP. So, I am passing
> > > > > the JobManager IP as a parameter to the operator at the moment. I
> > > > > suppose that it is easy to get the JobManager IP inside the
> > > > > AbstractUdfStreamOperator or simply add some method somewhere to get
> > > > > this value. However, I don't know where.
> > > > >
> > > > > Thanks,
> > > > > Felipe
> > > > >
> > > > > --
> > > > > -- Felipe Gutierrez
> > > > > -- skype: felipe.o.gutierrez
> > > > > -- https://felipeogutierrez.blogspot.com
> > > > >
> > > > > On Thu, May 21, 2020 at 7:13 AM Yangze Guo <[hidden email]> wrote:
> > > > > >
> > > > > > Hi, Felipe
> > > > > >
> > > > > > Do you mean to get the Host and Port of the task executor where your
> > > > > > operator is indeed running on?
> > > > > >
> > > > > > If that is the case, IIUC, two possible components that contain this
> > > > > > information are RuntimeContext and the Configuration param of
> > > > > > RichFunction#open. After reading the relevant code path, it seems you
> > > > > > could not get it at the moment.
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 11:46 PM Alexander Fedulov
> > > > > > <[hidden email]> wrote:
> > > > > > >
> > > > > > > Hi Felippe,
> > > > > > >
> > > > > > > could you clarify in some more details what you are trying to achieve?
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Alexander Fedulov | Solutions Architect
> > > > > > >
> > > > > > > +49 1514 6265796
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Follow us @VervericaData
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Join Flink Forward - The Apache Flink Conference
> > > > > > >
> > > > > > > Stream Processing | Event Driven | Real Time
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Ververica GmbH
> > > > > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > > > > > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Tony) Cheng
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 20, 2020 at 1:14 PM Felipe Gutierrez <[hidden email]> wrote:
> > > > > > >>
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> I have my own operator that extends the AbstractUdfStreamOperator
> > > > > > >> class and I want to issue some messages to it. Sometimes the operator
> > > > > > >> instances are deployed on different TaskManagers and I would like to
> > > > > > >> set some attributes like the master and slave IPs on it.
> > > > > > >>
> > > > > > >> I am trying to use these values but they only return localhost, not
> > > > > > >> the IP configured at flink-conf.yaml file. (jobmanager.rpc.address:
> > > > > > >> 192.168.56.1).
> > > > > > >>
> > > > > > >> ConfigOption<String> restAddressOption = ConfigOptions
> > > > > > >>    .key("rest.address")
> > > > > > >>    .stringType()
> > > > > > >>    .noDefaultValue();
> > > > > > >> System.out.println("DefaultJobManagerRunnerFactory rest.address: " +
> > > > > > >> jobMasterConfiguration.getConfiguration().getValue(restAddressOption));
> > > > > > >> System.out.println("rpcService: " + rpcService.getAddress());
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Felipe
> > > > > > >>
> > > > > > >> --
> > > > > > >> -- Felipe Gutierrez
> > > > > > >> -- skype: felipe.o.gutierrez
> > > > > > >> -- https://felipeogutierrez.blogspot.com