Error while reporting metrics - ConcurrentModificationException

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Error while reporting metrics - ConcurrentModificationException

PedroMrChaves
This post was updated on .
Hello,

I have the following error while trying to report metrics to influxdb using
the DropwizardReporter.

2018-03-20 13:51:00,288 WARN
org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while
reporting metrics
java.util.ConcurrentModificationException
        at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
        at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
        at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
        at java.util.HashSet.<init>(HashSet.java:120)
        at
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
        at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
        at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
        at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
        at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
        at
org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
        at
metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
        at
metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
        at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
        at
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Any ideas on what might be the problem?




-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best Regards,
Pedro Chaves
Reply | Threaded
Open this post in threaded view
|

Re: Error while reporting metrics - ConcorrentModificationException

Chesnay Schepler
A wrapped Kafka metric was accessing state of the consumer while said
state was modified.

As far as I can tell this is a Kafka issue and there's nothing we can do.

Unless this happens frequently it should be safe to ignore it.

On 20.03.2018 15:02, PedroMrChaves wrote:

> Hello,
>
> I have the following error while trying to report metrics to influxdb using
> the DropwizardReporter.
>
> 2018-03-20 13:51:00,288 WARN
> org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while
> reporting metrics
> java.util.ConcurrentModificationException
> at
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.<init>(HashSet.java:120)
> at
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> at
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
> at
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
> at
> org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
> at
> metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
> at
> metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
> at
> org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
> at
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Any ideas on what might be the problem?
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>