flink telemetry/metrics

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view

flink telemetry/metrics

John O

I’m working on getting a flink job into production. As part of the production requirement, I need telemetry/metrics insight into my flink job. I have followed instructions in

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code


When I submit the job, I can see my counter show up in the flink web ui’s Task Metrics section. But the counter does not show up in Graphite.  Also, the metrics that actually make it to Graphite doesn’t seems like it’s published properly.


Is anyone actually using Graphite Reporter? What is your experience? What am I missing?



Reply | Threaded
Open this post in threaded view

Re: flink telemetry/metrics

Chesnay Schepler
What is wrong with the metrics that are shown in graphite?

Can you provide us with the metrics section of your flink-conf.yaml?

Are there any metric-related warnings in the TaskManager logs?

On 09.08.2018 01:38, John O wrote:

I’m working on getting a flink job into production. As part of the production requirement, I need telemetry/metrics insight into my flink job. I have followed instructions in

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code


When I submit the job, I can see my counter show up in the flink web ui’s Task Metrics section. But the counter does not show up in Graphite.  Also, the metrics that actually make it to Graphite doesn’t seems like it’s published properly.


Is anyone actually using Graphite Reporter? What is your experience? What am I missing?



Reply | Threaded
Open this post in threaded view

RE: flink telemetry/metrics

John O

I have tried two reporter types (Graphite, JMX)



metrics.reporters: grph

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter

metrics.reporter.grph.host: xxxxxxxxxxxxxxxx

metrics.reporter.grph.port: 2003

metrics.reporter.grph.protocol: TCP


What I see on graphite are incomplete metrics. Some taskIdx (same process) will show different sets of metrics. For example,

-          Sink->test

-          0: currentInputWatermark, numRecordsIn

-          1: numRecordsOutPerSecond

-          2: numRecordsIn, numRecordsOut

-          4: currentInputWatermark


I would expect seeing

-          Sink->test

-          0: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          1: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          2: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          4: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…




metrics.reporters: jmx

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.jmx.port: 8789


I used visualvm with mbean explorer to view the exposed mbeans.

In this setup, I do see the expected metrics but the organization makes it difficult to find



I also get the following WARNing

2018-08-09 18:36:55,943 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while reporting metrics


        at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)

        at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)

        at java.util.AbstractCollection.addAll(AbstractCollection.java:343)

        at java.util.HashSet.<init>(HashSet.java:120)

        at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)

        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)

        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)

        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)

        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)

        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:37)

        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:27)

        at org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)

        at com.codahale.metrics.graphite.GraphiteReporter.reportGauge(GraphiteReporter.java:279)

        at com.codahale.metrics.graphite.GraphiteReporter.report(GraphiteReporter.java:169)

        at org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)

        at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)


I am using flink 1.5.1




From: Chesnay Schepler <[hidden email]>
Sent: Friday, August 10, 2018 2:08 AM
To: John O <[hidden email]>; [hidden email]
Subject: Re: flink telemetry/metrics


What is wrong with the metrics that are shown in graphite?

Can you provide us with the metrics section of your flink-conf.yaml?

Are there any metric-related warnings in the TaskManager logs?

On 09.08.2018 01:38, John O wrote:

I’m working on getting a flink job into production. As part of the production requirement, I need telemetry/metrics insight into my flink job. I have followed instructions in

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code


When I submit the job, I can see my counter show up in the flink web ui’s Task Metrics section. But the counter does not show up in Graphite.  Also, the metrics that actually make it to Graphite doesn’t seems like it’s published properly.


Is anyone actually using Graphite Reporter? What is your experience? What am I missing?




Reply | Threaded
Open this post in threaded view

Re: flink telemetry/metrics

Chesnay Schepler
How often is the warning logged? The default reporting interval is 10 seconds, if a report is interrupted it can take a while for metrics to show up.

Could this also be caused by the MAX_CREATES_PER_MINUTE setting in carbon.conf being set too low?

On 13.08.2018 21:31, John O wrote:

I have tried two reporter types (Graphite, JMX)



metrics.reporters: grph

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter

metrics.reporter.grph.host: xxxxxxxxxxxxxxxx

metrics.reporter.grph.port: 2003

metrics.reporter.grph.protocol: TCP


What I see on graphite are incomplete metrics. Some taskIdx (same process) will show different sets of metrics. For example,

-          Sink->test

-          0: currentInputWatermark, numRecordsIn

-          1: numRecordsOutPerSecond

-          2: numRecordsIn, numRecordsOut

-          4: currentInputWatermark


I would expect seeing

-          Sink->test

-          0: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          1: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          2: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…

-          4: currentInputWatermark, numRecordsIn, numRecordsOut, numRecordsOutPerSecond, numRecordsInPerSecond…




metrics.reporters: jmx

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.jmx.port: 8789


I used visualvm with mbean explorer to view the exposed mbeans.

In this setup, I do see the expected metrics but the organization makes it difficult to find



I also get the following WARNing

2018-08-09 18:36:55,943 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while reporting metrics


        at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)

        at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)

        at java.util.AbstractCollection.addAll(AbstractCollection.java:343)

        at java.util.HashSet.<init>(HashSet.java:120)

        at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)

        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)

        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)

        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)

        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)

        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:37)

        at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:27)

        at org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)

        at com.codahale.metrics.graphite.GraphiteReporter.reportGauge(GraphiteReporter.java:279)

        at com.codahale.metrics.graphite.GraphiteReporter.report(GraphiteReporter.java:169)

        at org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)

        at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)


I am using flink 1.5.1




From: Chesnay Schepler [hidden email]
Sent: Friday, August 10, 2018 2:08 AM
To: John O [hidden email]; [hidden email]
Subject: Re: flink telemetry/metrics


What is wrong with the metrics that are shown in graphite?

Can you provide us with the metrics section of your flink-conf.yaml?

Are there any metric-related warnings in the TaskManager logs?

On 09.08.2018 01:38, John O wrote:

I’m working on getting a flink job into production. As part of the production requirement, I need telemetry/metrics insight into my flink job. I have followed instructions in

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter

- Added a simple counter in my flink code


When I submit the job, I can see my counter show up in the flink web ui’s Task Metrics section. But the counter does not show up in Graphite.  Also, the metrics that actually make it to Graphite doesn’t seems like it’s published properly.


Is anyone actually using Graphite Reporter? What is your experience? What am I missing?