Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

bat man
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that the cluster might need a re-start as only with these steps when I run the job I don't see any measurement(table) created in my influx db. I am not able to find any documentation on how to restart the cluster on EMR. 
Anyone who has configured to push metrics to InfluxDB from AWS EMR could you share the steps please.


Thanks,
Hemant
Reply | Threaded
Open this post in threaded view
|

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

bat man
An update in the yarn logs I could see the below -

Classpath: lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar....
..........
......

This means the jar is getting loaded, in the logs I could also see -
2020-08-12 15:28:51,505 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Registered UNIX signal handlers for [TERM, HUP, I
NT]
2020-08-12 15:28:51,508 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working Directory: /mnt/yarn/usercache/ha
doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_000004
2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.interval, 60 SECONDS

2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.yarn.conf.dir
, /etc/hadoop/conf

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.host, xx.xxx.xxx.xx
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability
.cluster-id, application_1595767096609_0013
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.ad
dress, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.password, ******
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: FLINK_PLUGINS_DIR
, /usr/lib/flink/plugins

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.db, xxxxxx
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.connectTimeout, 60000
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.hadoop.conf.d
ir, /etc/hadoop/conf
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numbe
rOfTaskSlots, 1
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.port, 0
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.username, xxxx
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.size, 264241152b
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.tmpdir, /tmp/flink-web-5562f065-6020-4c38-8260-3aea434bf285
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 32777
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.port, 8086
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.retentionPolicy, one_hour
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: internal.cluster.execution-mode, NORMAL
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.writeTimeout, 60000
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.consistency, ONE
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.factory.class, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
.......
but then below I could see -

2020-08-12 15:28:51,523 WARN  org.apache.flink.core.plugin.PluginConfig                     - Environment variable [FLINK_PLUGINS_DIR] is set to [/usr/lib/flink/plugins] but the directory doesn't exist
2020-08-12 15:28:51,561 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working/local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.runtime.clusterframework.BootstrapTools      - Setting directories for temporary files to: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab path obtained null
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab principal obtained null
2020-08-12 15:28:51,566 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - YARN daemon is running as: hadoop Yarn client user obtainer: hadoop
2020-08-12 15:28:51,675 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to hadoop (auth:xxxxxx)
2020-08-12 15:28:51,984 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port'
2020-08-12 15:28:51,987 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Using configured hostname/address for TaskManager: ip-xx-x-xx-xxx.ec2.internal.
2020-08-12 15:28:51,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.internal:0
2020-08-12 15:28:52,823 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2020-08-12 15:28:52,854 INFO  akka.remote.Remoting                                          - Starting remoting
2020-08-12 15:28:53,061 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[hidden email]:37937]
2020-08-12 15:28:53,563 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://[hidden email]:37937
2020-08-12 15:28:53,593 WARN  org.apache.flink.runtime.metrics.ReporterSetup                - The reporter factory (org.apache.flink.metrics.influxdb.InfluxdbReporterFactory) could not be found for reporter influxdb. Available factories:
2020-08-12 15:28:53,597 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2020-08-12 15:28:53,599 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.ec2.internal:0

So at one place org.apache.flink.configuration.GlobalConfiguration refers to the properties and metrics reported but then org.apache.flink.runtime.metrics.ReporterSetup complains of not finding it.

Can anyone guide what I am missing here.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:15 PM bat man <[hidden email]> wrote:
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that the cluster might need a re-start as only with these steps when I run the job I don't see any measurement(table) created in my influx db. I am not able to find any documentation on how to restart the cluster on EMR. 
Anyone who has configured to push metrics to InfluxDB from AWS EMR could you share the steps please.


Thanks,
Hemant
Reply | Threaded
Open this post in threaded view
|

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

bat man
Anyone who has made metrics integration to external systems for flink running on AWS EMR, can you share if its a configuration issue or EMR specific issue.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:55 PM bat man <[hidden email]> wrote:
An update in the yarn logs I could see the below -

Classpath: lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar....
..........
......

This means the jar is getting loaded, in the logs I could also see -
2020-08-12 15:28:51,505 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Registered UNIX signal handlers for [TERM, HUP, I
NT]
2020-08-12 15:28:51,508 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working Directory: /mnt/yarn/usercache/ha
doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_000004
2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.interval, 60 SECONDS

2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.yarn.conf.dir
, /etc/hadoop/conf

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.host, xx.xxx.xxx.xx
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability
.cluster-id, application_1595767096609_0013
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.ad
dress, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.password, ******
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: FLINK_PLUGINS_DIR
, /usr/lib/flink/plugins

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.db, xxxxxx
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.connectTimeout, 60000
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.hadoop.conf.d
ir, /etc/hadoop/conf
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numbe
rOfTaskSlots, 1
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.port, 0
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.username, xxxx
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.size, 264241152b
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.tmpdir, /tmp/flink-web-5562f065-6020-4c38-8260-3aea434bf285
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 32777
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.port, 8086
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.retentionPolicy, one_hour
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: internal.cluster.execution-mode, NORMAL
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.writeTimeout, 60000
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.consistency, ONE
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.factory.class, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
.......
but then below I could see -

2020-08-12 15:28:51,523 WARN  org.apache.flink.core.plugin.PluginConfig                     - Environment variable [FLINK_PLUGINS_DIR] is set to [/usr/lib/flink/plugins] but the directory doesn't exist
2020-08-12 15:28:51,561 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working/local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.runtime.clusterframework.BootstrapTools      - Setting directories for temporary files to: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab path obtained null
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab principal obtained null
2020-08-12 15:28:51,566 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - YARN daemon is running as: hadoop Yarn client user obtainer: hadoop
2020-08-12 15:28:51,675 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to hadoop (auth:xxxxxx)
2020-08-12 15:28:51,984 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port'
2020-08-12 15:28:51,987 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Using configured hostname/address for TaskManager: ip-xx-x-xx-xxx.ec2.internal.
2020-08-12 15:28:51,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.internal:0
2020-08-12 15:28:52,823 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2020-08-12 15:28:52,854 INFO  akka.remote.Remoting                                          - Starting remoting
2020-08-12 15:28:53,061 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[hidden email]:37937]
2020-08-12 15:28:53,563 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://[hidden email]:37937
2020-08-12 15:28:53,593 WARN  org.apache.flink.runtime.metrics.ReporterSetup                - The reporter factory (org.apache.flink.metrics.influxdb.InfluxdbReporterFactory) could not be found for reporter influxdb. Available factories:
2020-08-12 15:28:53,597 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2020-08-12 15:28:53,599 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.ec2.internal:0

So at one place org.apache.flink.configuration.GlobalConfiguration refers to the properties and metrics reported but then org.apache.flink.runtime.metrics.ReporterSetup complains of not finding it.

Can anyone guide what I am missing here.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:15 PM bat man <[hidden email]> wrote:
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that the cluster might need a re-start as only with these steps when I run the job I don't see any measurement(table) created in my influx db. I am not able to find any documentation on how to restart the cluster on EMR. 
Anyone who has configured to push metrics to InfluxDB from AWS EMR could you share the steps please.


Thanks,
Hemant
Reply | Threaded
Open this post in threaded view
|

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

Arvid Heise-3
Hi Hemant,

according to the influx section of the 1.9 metric documentation [1], you should use the reporter without a factory. The factory was added later.

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty
metrics.reporter.influxdb.retentionPolicy: one_hour


On Thu, Aug 13, 2020 at 8:10 AM bat man <[hidden email]> wrote:
Anyone who has made metrics integration to external systems for flink running on AWS EMR, can you share if its a configuration issue or EMR specific issue.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:55 PM bat man <[hidden email]> wrote:
An update in the yarn logs I could see the below -

Classpath: lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar....
..........
......

This means the jar is getting loaded, in the logs I could also see -
2020-08-12 15:28:51,505 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Registered UNIX signal handlers for [TERM, HUP, I
NT]
2020-08-12 15:28:51,508 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working Directory: /mnt/yarn/usercache/ha
doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_000004
2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.interval, 60 SECONDS

2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.yarn.conf.dir
, /etc/hadoop/conf

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.host, xx.xxx.xxx.xx
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability
.cluster-id, application_1595767096609_0013
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.ad
dress, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.password, ******
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: FLINK_PLUGINS_DIR
, /usr/lib/flink/plugins

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.db, xxxxxx
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.connectTimeout, 60000
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.hadoop.conf.d
ir, /etc/hadoop/conf
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numbe
rOfTaskSlots, 1
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.port, 0
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.username, xxxx
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.size, 264241152b
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.tmpdir, /tmp/flink-web-5562f065-6020-4c38-8260-3aea434bf285
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 32777
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.port, 8086
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.retentionPolicy, one_hour
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: internal.cluster.execution-mode, NORMAL
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.writeTimeout, 60000
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.consistency, ONE
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.factory.class, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
.......
but then below I could see -

2020-08-12 15:28:51,523 WARN  org.apache.flink.core.plugin.PluginConfig                     - Environment variable [FLINK_PLUGINS_DIR] is set to [/usr/lib/flink/plugins] but the directory doesn't exist
2020-08-12 15:28:51,561 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working/local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.runtime.clusterframework.BootstrapTools      - Setting directories for temporary files to: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab path obtained null
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab principal obtained null
2020-08-12 15:28:51,566 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - YARN daemon is running as: hadoop Yarn client user obtainer: hadoop
2020-08-12 15:28:51,675 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to hadoop (auth:xxxxxx)
2020-08-12 15:28:51,984 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port'
2020-08-12 15:28:51,987 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Using configured hostname/address for TaskManager: ip-xx-x-xx-xxx.ec2.internal.
2020-08-12 15:28:51,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.internal:0
2020-08-12 15:28:52,823 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2020-08-12 15:28:52,854 INFO  akka.remote.Remoting                                          - Starting remoting
2020-08-12 15:28:53,061 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[hidden email]:37937]
2020-08-12 15:28:53,563 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://[hidden email]:37937
2020-08-12 15:28:53,593 WARN  org.apache.flink.runtime.metrics.ReporterSetup                - The reporter factory (org.apache.flink.metrics.influxdb.InfluxdbReporterFactory) could not be found for reporter influxdb. Available factories:
2020-08-12 15:28:53,597 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2020-08-12 15:28:53,599 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.ec2.internal:0

So at one place org.apache.flink.configuration.GlobalConfiguration refers to the properties and metrics reported but then org.apache.flink.runtime.metrics.ReporterSetup complains of not finding it.

Can anyone guide what I am missing here.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:15 PM bat man <[hidden email]> wrote:
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that the cluster might need a re-start as only with these steps when I run the job I don't see any measurement(table) created in my influx db. I am not able to find any documentation on how to restart the cluster on EMR. 
Anyone who has configured to push metrics to InfluxDB from AWS EMR could you share the steps please.


Thanks,
Hemant


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

bat man
Hello Arvid,

Thanks I’ll check my config and use the correct reporter and test it out.

Thanks,
Hemant

On Fri, 14 Aug 2020 at 6:57 PM, Arvid Heise <[hidden email]> wrote:
Hi Hemant,

according to the influx section of the 1.9 metric documentation [1], you should use the reporter without a factory. The factory was added later.

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: localhost
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink
metrics.reporter.influxdb.username: flink-metrics
metrics.reporter.influxdb.password: qwerty
metrics.reporter.influxdb.retentionPolicy: one_hour


On Thu, Aug 13, 2020 at 8:10 AM bat man <[hidden email]> wrote:
Anyone who has made metrics integration to external systems for flink running on AWS EMR, can you share if its a configuration issue or EMR specific issue.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:55 PM bat man <[hidden email]> wrote:
An update in the yarn logs I could see the below -

Classpath: lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar....
..........
......

This means the jar is getting loaded, in the logs I could also see -
2020-08-12 15:28:51,505 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Registered UNIX signal handlers for [TERM, HUP, I
NT]
2020-08-12 15:28:51,508 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working Directory: /mnt/yarn/usercache/ha
doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_000004
2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.interval, 60 SECONDS

2020-08-12 15:28:51,512 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.yarn.conf.dir
, /etc/hadoop/conf

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.host, xx.xxx.xxx.xx
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: high-availability
.cluster-id, application_1595767096609_0013
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.ad
dress, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.password, ******
2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: FLINK_PLUGINS_DIR
, /usr/lib/flink/plugins

2020-08-12 15:28:51,513 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.db, xxxxxx
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.
influxdb.connectTimeout, 60000
2020-08-12 15:28:51,520 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.hadoop.conf.d
ir, /etc/hadoop/conf
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numbe
rOfTaskSlots, 1
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.port, 0
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.username, xxxx
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.size, 264241152b
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: web.tmpdir, /tmp/flink-web-5562f065-6020-4c38-8260-3aea434bf285
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 32777
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.port, 8086
2020-08-12 15:28:51,521 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.retentionPolicy, one_hour
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: internal.cluster.execution-mode, NORMAL
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.writeTimeout, 60000
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.consistency, ONE
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, ip-xx-x-xx-xxx.ec2.internal
2020-08-12 15:28:51,522 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: metrics.reporter.influxdb.factory.class, org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
.......
but then below I could see -

2020-08-12 15:28:51,523 WARN  org.apache.flink.core.plugin.PluginConfig                     - Environment variable [FLINK_PLUGINS_DIR] is set to [/usr/lib/flink/plugins] but the directory doesn't exist
2020-08-12 15:28:51,561 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - Current working/local Directory: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.runtime.clusterframework.BootstrapTools      - Setting directories for temporary files to: /mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab path obtained null
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - TM: remote keytab principal obtained null
2020-08-12 15:28:51,566 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - YARN daemon is running as: hadoop Yarn client user obtainer: hadoop
2020-08-12 15:28:51,675 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to hadoop (auth:xxxxxx)
2020-08-12 15:28:51,984 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port'
2020-08-12 15:28:51,987 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Using configured hostname/address for TaskManager: ip-xx-x-xx-xxx.ec2.internal.
2020-08-12 15:28:51,996 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.internal:0
2020-08-12 15:28:52,823 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2020-08-12 15:28:52,854 INFO  akka.remote.Remoting                                          - Starting remoting
2020-08-12 15:28:53,061 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[hidden email]:37937]
2020-08-12 15:28:53,563 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://[hidden email]:37937
2020-08-12 15:28:53,593 WARN  org.apache.flink.runtime.metrics.ReporterSetup                - The reporter factory (org.apache.flink.metrics.influxdb.InfluxdbReporterFactory) could not be found for reporter influxdb. Available factories:
2020-08-12 15:28:53,597 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2020-08-12 15:28:53,599 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at ip-xx-x-xx-xxx.ec2.ec2.internal:0

So at one place org.apache.flink.configuration.GlobalConfiguration refers to the properties and metrics reported but then org.apache.flink.runtime.metrics.ReporterSetup complains of not finding it.

Can anyone guide what I am missing here.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:15 PM bat man <[hidden email]> wrote:
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that the cluster might need a re-start as only with these steps when I run the job I don't see any measurement(table) created in my influx db. I am not able to find any documentation on how to restart the cluster on EMR. 
Anyone who has configured to push metrics to InfluxDB from AWS EMR could you share the steps please.


Thanks,
Hemant


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng