ConcurrentModificationException when using histogram accumulators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ConcurrentModificationException when using histogram accumulators

Yukun Guo
This happens when the TaskManager is serializing an org.apache.flink.api.common.accumulators.Histogram by iterating through the underlying TreeMap while a MapFunction for updating the accumulator attempts to modify the TreeMap concurrently. How could I fix it?


The call stack:

WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize accumulators for task.
java.util.ConcurrentModificationException
        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
        at java.util.TreeMap.writeObject(TreeMap.java:2436)
        at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        at java.util.HashMap.writeObject(HashMap.java:1362)
        at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
        at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
        ...

Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationException when using histogram accumulators

Till Rohrmann
Hi Yukun,

I think you've found a bug in the code. The accumulators don't seem to be really thread safe. I've created an issue to fix this issue [1]. Thanks for reporting the problem :-)


Cheers,
Till

On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo <[hidden email]> wrote:
This happens when the TaskManager is serializing an org.apache.flink.api.common.accumulators.Histogram by iterating through the underlying TreeMap while a MapFunction for updating the accumulator attempts to modify the TreeMap concurrently. How could I fix it?


The call stack:

WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     - Failed to serialize accumulators for task.
java.util.ConcurrentModificationException
        at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
        at java.util.TreeMap.writeObject(TreeMap.java:2436)
        at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        at java.util.HashMap.writeObject(HashMap.java:1362)
        at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
        at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
        ...