This happens when the TaskManager is serializing an org.apache.flink.api.common.accumulators.Histogram by iterating through the underlying TreeMap while a MapFunction for updating the accumulator attempts to modify the TreeMap concurrently. How could I fix it?
The call stack: WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to serialize accumulators for task. java.util.ConcurrentModificationException at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247) at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242) at java.util.TreeMap.writeObject(TreeMap.java:2436) at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1785) at java.util.HashMap.writeObject(HashMap.java:1362) at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286) ... |
Hi Yukun, I think you've found a bug in the code. The accumulators don't seem to be really thread safe. I've created an issue to fix this issue [1]. Thanks for reporting the problem :-) Cheers, Till On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |