Regarding Concurrent Modification Exception
Posted by
Biplob Biswas on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Regarding-Concurrent-Modification-Exception-tp4898.html
Hi,
We are getting a ConcurrentModificationException, the complete stack trace is as follows:
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at compute(ArpackSVD.java:367) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.flink.client.program.Client)
client (org.apache.flink.client.program.ContextEnvironment)
context (org.apache.flink.api.java.operators.MapOperator)
matrix (flink.pca.impl.svd.ArpackSVD)
this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:166)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:534)
at org.apache.flink.client.program.Client.runBlocking(Client.java:347)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at flink.pca.impl.svd.ArpackSVD.compute(ArpackSVD.java:379)
at flink.pca.impl.PCA.computeSVD(PCA.java:110)
at flink.pca.impl.PCA.project(PCA.java:35)
at flink.pca.impl.PCASystemTest.runFlinkJob(PCASystemTest.java:90)
at flink.pca.impl.PCASystemTest.main(PCASystemTest.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.flink.client.program.Client)
client (org.apache.flink.client.program.ContextEnvironment)
context (org.apache.flink.api.java.operators.MapOperator)
matrix (flink.pca.impl.svd.ArpackSVD)
this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:193)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:259)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:890)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
... 30 more
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
at java.util.Vector$Itr.next(Vector.java:1137)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
... 73 more
Can anyone enlighten us as why is it like this or how to fix this issue? We did a bit of google search, but all we get is some problem with serializing broadcast variable. We use flink bulk iterations and this variable is broadcasted to both map and reduce in one dataflow!
Thanks & Regards
Biplob Biswas