Flink logs with extra pipeline property

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink logs with extra pipeline property

Sidney Feiner

Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline generated the log . At first I thought I'd add another field that logs some environment variable like such: 

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", "message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my "Main" I have access to the pipeline name and I also have access to this variable in the tasks themselves. I would prefer not needing to explicitly using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything else if necessary),

Thanks :)


Reply | Threaded
Open this post in threaded view
|

Re: Flink logs with extra pipeline property

Yang Wang
I think you could use the following config options to set the environments for JobManager and TaskManager.
And then you could use the envs in the log4j configuration file. "${env:PIPELINE}" could be used in log4j2.

containerized.master.env.PIPELINE: my-flink-pipeline
containerized.taskmanager.env.PIPELINE: my-flink-pipeline


For log4j2, I am afraid you need to set the java dynamic option[1] to get a similar effect.



Best,
Yang

Sidney Feiner <[hidden email]> 于2020年12月6日周日 下午10:13写道:

Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline generated the log . At first I thought I'd add another field that logs some environment variable like such: 

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", "message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my "Main" I have access to the pipeline name and I also have access to this variable in the tasks themselves. I would prefer not needing to explicitly using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything else if necessary),

Thanks :)


Reply | Threaded
Open this post in threaded view
|

Re: Flink logs with extra pipeline property

Sidney Feiner
I'm using a dockerized HA cluster that I submit pipelines to through the CLI.
So where exactly can I configure the PIPELINE env variable? Seems like it needs to be set per container. But many different pipelines run on the same TaskManager (so also the same container).

And your example mentions log4j2 twice. Once without using the java dynamic options and the second twice saying it required setting the java dynamic version so I'm a bit confused here 🤓

I appreciate the support btw 🙂

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 


From: Yang Wang <[hidden email]>
Sent: Monday, December 7, 2020 4:53 AM
To: Sidney Feiner <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Flink logs with extra pipeline property
 
I think you could use the following config options to set the environments for JobManager and TaskManager.
And then you could use the envs in the log4j configuration file. "${env:PIPELINE}" could be used in log4j2.

containerized.master.env.PIPELINE: my-flink-pipeline
containerized.taskmanager.env.PIPELINE: my-flink-pipeline


For log4j2, I am afraid you need to set the java dynamic option[1] to get a similar effect.



Best,
Yang

Sidney Feiner <[hidden email]> 于2020年12月6日周日 下午10:13写道:

Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline generated the log . At first I thought I'd add another field that logs some environment variable like such: 

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", "message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my "Main" I have access to the pipeline name and I also have access to this variable in the tasks themselves. I would prefer not needing to explicitly using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything else if necessary),

Thanks :)


Reply | Threaded
Open this post in threaded view
|

Re: Flink logs with extra pipeline property

Yang Wang
For log4j1, I am afraid you need to set the java dynamic option[1] to get a similar effect.
Sorry for the inconvenience. Actually, I means log4j1 in the above sentence. IIRC, log4j1 could not
support using system env in the log4j configuration.

However, it seems that you are running a session cluster and submitting multiple Flink jobs in a same
Flink session cluster. Then I am afraid it is not possible to set different PIPELINE names for a same session.
Because the log4j configuration is built for per process(aka per TaskManager).

If you are using the job cluster[1], then the env(log4j2) and java dynamic options(log4j1) could work.
* Env
docker run ... --env PIPELINE=my-flink-cluster ...
* Java dynamic options
docker run ... --env JVM_ARGS="-DPIPELINE=my-flink-cluster" ...




Best,
Yang

Sidney Feiner <[hidden email]> 于2020年12月8日周二 上午12:29写道:
I'm using a dockerized HA cluster that I submit pipelines to through the CLI.
So where exactly can I configure the PIPELINE env variable? Seems like it needs to be set per container. But many different pipelines run on the same TaskManager (so also the same container).

And your example mentions log4j2 twice. Once without using the java dynamic options and the second twice saying it required setting the java dynamic version so I'm a bit confused here 🤓

I appreciate the support btw 🙂

Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skypesidney.feiner.startapp
 
emailsignature


From: Yang Wang <[hidden email]>
Sent: Monday, December 7, 2020 4:53 AM
To: Sidney Feiner <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Flink logs with extra pipeline property
 
I think you could use the following config options to set the environments for JobManager and TaskManager.
And then you could use the envs in the log4j configuration file. "${env:PIPELINE}" could be used in log4j2.

containerized.master.env.PIPELINE: my-flink-pipeline
containerized.taskmanager.env.PIPELINE: my-flink-pipeline


For log4j2, I am afraid you need to set the java dynamic option[1] to get a similar effect.



Best,
Yang

Sidney Feiner <[hidden email]> 于2020年12月6日周日 下午10:13写道:

Hi,

We're using Apache Flink 1.9.2 and we've started logging everything as JSON with log4j (standard log4j1 that comes with Flink). When I say JSON logging, I just mean that I've formatted in according to:

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "message": "%m"}%n


Now I would like to somehow add a field to this JSON to indicate which pipeline generated the log . At first I thought I'd add another field that logs some environment variable like such: 

log4j.appender.console.layout.ConversionPattern={"level": "%p", "ts": "%d{ISO8601}", "class": "%c", "line": "%L", "pipeline: "${PIPELINE}", "message": "%m"}%n

But that doesn't seem to be working (is it because the TM is inited before the pipeline and that's when the placeholders are set?).

Do you know of a way I could add a field of the current pipeline running? In my "Main" I have access to the pipeline name and I also have access to this variable in the tasks themselves. I would prefer not needing to explicitly using this variable when I log, but that it would be automatic during logging.

If anybody has an idea, I'd love to hear it (we can use logback or anything else if necessary),

Thanks :)