How to retrieve values from yarn.taskmanager.env in a Job?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How to retrieve values from yarn.taskmanager.env in a Job?

Shannon Carey

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon
Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Till Rohrmann

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon

Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Chesnay Schepler
Hello,

can you clarify one small thing for me: Do you want to access this parameter when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon


Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Shannon Carey
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables to the task managers, I figured it would definitely be available within the stream operators. I'm not sure whether the job plan runs within a task manager or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to read the correct configuration file(s) so that properly populated config objects can be passed to various operators. Therefore, it would be sufficient for the job plan execution to have access to the environment. All the operators are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available everywhere. If it's only available during job planning then you have to make sure to serialize it everywhere you need it, and if it's only available during operator execution then it's less straightforward to do central configuration work. Either way it's lying in wait for a programmer to forget where it's accessible vs. not.

-Shannon

From: Chesnay Schepler <[hidden email]>
Date: Monday, December 12, 2016 at 7:36 AM
To: <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon


Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Shannon Carey
In reply to this post by Till Rohrmann
Hi Till,

Yes, System.getenv() was the first thing I tried. It'd be great if someone else can reproduce the issue, but for now I'll submit a JIRA with the assumption that it really is not working right. https://issues.apache.org/jira/browse/FLINK-5322

-Shannon

From: Till Rohrmann <[hidden email]>
Date: Monday, December 12, 2016 at 7:21 AM
To: <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon

Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Till Rohrmann-2
In reply to this post by Shannon Carey
Hi Shannon,

the job graph generation does not run in the task manager but on the client. The job graph is then submitted to the JobManager which then will deploy the individual tasks to the TaskManager. Thus, when generating the job graph the task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within your UDFs.

What you could do is to union all configuration objects and then reading only those entries relevant for a specific environment on the task manager, e.g. open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey <[hidden email]> wrote:
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables to the task managers, I figured it would definitely be available within the stream operators. I'm not sure whether the job plan runs within a task manager or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to read the correct configuration file(s) so that properly populated config objects can be passed to various operators. Therefore, it would be sufficient for the job plan execution to have access to the environment. All the operators are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available everywhere. If it's only available during job planning then you have to make sure to serialize it everywhere you need it, and if it's only available during operator execution then it's less straightforward to do central configuration work. Either way it's lying in wait for a programmer to forget where it's accessible vs. not.

-Shannon

From: Chesnay Schepler <[hidden email]>
Date: Monday, December 12, 2016 at 7:36 AM
To: <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon



Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

Shannon Carey
Till,

Unfortunately, System.getenv() doesn't contain the expected variable even within the UDFs, but thanks for the info!

In the Yarn setting, "the client" would be either:
  1. the bin/flink executable (with configuration based on where it's run from… which might not be the same as the destination Flink cluster) OR
  2. the web UI… the job planning runs in the existing JVM of the web UI? That runs as part of the Job Manager, right? This is the primary method by which we launch jobs, currently.
Is that right?

I will try out "env.java.opts" to see if that has any effect.

-Shannon

From: Till Rohrmann <[hidden email]>
Date: Tuesday, December 13, 2016 at 4:34 AM
To: <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hi Shannon,

the job graph generation does not run in the task manager but on the client. The job graph is then submitted to the JobManager which then will deploy the individual tasks to the TaskManager. Thus, when generating the job graph the task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within your UDFs.

What you could do is to union all configuration objects and then reading only those entries relevant for a specific environment on the task manager, e.g. open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey <[hidden email]> wrote:
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables to the task managers, I figured it would definitely be available within the stream operators. I'm not sure whether the job plan runs within a task manager or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to read the correct configuration file(s) so that properly populated config objects can be passed to various operators. Therefore, it would be sufficient for the job plan execution to have access to the environment. All the operators are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available everywhere. If it's only available during job planning then you have to make sure to serialize it everywhere you need it, and if it's only available during operator execution then it's less straightforward to do central configuration work. Either way it's lying in wait for a programmer to forget where it's accessible vs. not.

-Shannon

From: Chesnay Schepler <[hidden email]>
Date: Monday, December 12, 2016 at 7:36 AM
To: <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon



Reply | Threaded
Open this post in threaded view
|

Re: How to retrieve values from yarn.taskmanager.env in a Job?

rmetzger0
Looks like the issue was resolved in the JIRA issue: https://issues.apache.org/jira/browse/FLINK-5322

On Tue, Dec 13, 2016 at 7:32 PM, Shannon Carey <[hidden email]> wrote:
Till,

Unfortunately, System.getenv() doesn't contain the expected variable even within the UDFs, but thanks for the info!

In the Yarn setting, "the client" would be either:
  1. the bin/flink executable (with configuration based on where it's run from… which might not be the same as the destination Flink cluster) OR
  2. the web UI… the job planning runs in the existing JVM of the web UI? That runs as part of the Job Manager, right? This is the primary method by which we launch jobs, currently.
Is that right?

I will try out "env.java.opts" to see if that has any effect.

-Shannon

From: Till Rohrmann <[hidden email]>
Date: Tuesday, December 13, 2016 at 4:34 AM
To: <[hidden email]>
Cc: Chesnay Schepler <[hidden email]>

Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hi Shannon,

the job graph generation does not run in the task manager but on the client. The job graph is then submitted to the JobManager which then will deploy the individual tasks to the TaskManager. Thus, when generating the job graph the task manager environment variables are not accessible.

Thus, you are only able to access these environment variables from within your UDFs.

What you could do is to union all configuration objects and then reading only those entries relevant for a specific environment on the task manager, e.g. open method of a RichFunction.

Cheers,
Till

On Mon, Dec 12, 2016 at 7:12 PM, Shannon Carey <[hidden email]> wrote:
Hi Chesnay,

Since that configuration option is supposed to apply the environment variables to the task managers, I figured it would definitely be available within the stream operators. I'm not sure whether the job plan runs within a task manager or not, but hopefully it does?

In my particular code, I want to get the name of the environment in order to read the correct configuration file(s) so that properly populated config objects can be passed to various operators. Therefore, it would be sufficient for the job plan execution to have access to the environment. All the operators are capable of persisting any necessary configuration through serialization.

It really can work either way, but I think it'd be easiest if it was available everywhere. If it's only available during job planning then you have to make sure to serialize it everywhere you need it, and if it's only available during operator execution then it's less straightforward to do central configuration work. Either way it's lying in wait for a programmer to forget where it's accessible vs. not.

-Shannon

From: Chesnay Schepler <[hidden email]>
Date: Monday, December 12, 2016 at 7:36 AM
To: <[hidden email]>
Subject: Re: How to retrieve values from yarn.taskmanager.env in a Job?

Hello,

can you clarify one small thing for me: Do you want to access this parameter when you define the plan
(aka when you call methods on the StreamExecutionEnvironment or DataStream instances)
or from within your functions/operators?

Regards,
Chesnay Schepler


On 12.12.2016 14:21, Till Rohrmann wrote:

Hi Shannon,

have you tried accessing the environment variables via System.getenv()? This should give you a map of string-string key value pairs where the key is the environment variable name.

If your values are not set in the returned map, then this indicates a bug in Flink and it would be great if you could open a JIRA issue.

Cheers,
Till


On Fri, Dec 9, 2016 at 7:33 PM, Shannon Carey <[hidden email]> wrote:

I have configured a value within yarn.taskmanager.env, and I see it appearing in the Flink web UI in the list underneath Job Manager -> Configuration. However, I can't figure out how to retrieve the value from within a Flink job. It doesn't appear in the environment, the system properties, or my ParameterTool instance, and I can't figure out how I would get to it via the StreamExecutionEnvironment. Can anyone point me in the right direction?

All I want to do is inform my Flink jobs which environment they're running on, so that programmers don't have to specify the environment as a job parameter every time they run it. I also see that there is a "env.java.opts" configuration… does that work in YARN apps (would my jobs be able to see it?)

Thanks!
Shannon