Regarding Concurrent Modification Exception

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Regarding Concurrent Modification Exception

Biplob Biswas
Hi,
We are getting a ConcurrentModificationException, the complete stack trace is as follows:

org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at compute(ArpackSVD.java:367) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.flink.client.program.Client)
client (org.apache.flink.client.program.ContextEnvironment)
context (org.apache.flink.api.java.operators.MapOperator)
matrix (flink.pca.impl.svd.ArpackSVD)
this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:166)
at org.apache.flink.client.program.Client.getJobGraph(Client.java:534)
at org.apache.flink.client.program.Client.runBlocking(Client.java:347)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at flink.pca.impl.svd.ArpackSVD.compute(ArpackSVD.java:379)
at flink.pca.impl.PCA.computeSVD(PCA.java:110)
at flink.pca.impl.PCA.project(PCA.java:35)
at flink.pca.impl.PCASystemTest.runFlinkJob(PCASystemTest.java:90)
at flink.pca.impl.PCASystemTest.main(PCASystemTest.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (akka.actor.ReflectiveDynamicAccess)
_pm (akka.actor.ActorSystemImpl)
actorSystem (org.apache.flink.client.program.Client)
client (org.apache.flink.client.program.ContextEnvironment)
context (org.apache.flink.api.java.operators.MapOperator)
matrix (flink.pca.impl.svd.ArpackSVD)
this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:193)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:259)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:890)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
... 30 more
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
at java.util.Vector$Itr.next(Vector.java:1137)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
... 73 more


Can anyone enlighten us as why is it like this or how to fix this issue? We did a bit of google search, but all we get is some problem with serializing broadcast variable. We use flink bulk iterations and this variable is broadcasted to both map and reduce in one dataflow! 

Thanks & Regards
Biplob Biswas
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Concurrent Modification Exception

Fabian Hueske-2
Hi,

This stacktrace looks really suspicious.
It includes classes from the submission client (CLIClient), optimizer (JobGraphGenerator), and runtime (KryoSerializer).

Is it possible that you try to start a new Flink job inside another job?
This would not work.

Best, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: Regarding Concurrent Modification Exception

Till Rohrmann
But isn't that a normal stack trace which you see when you submit a job to the cluster via the CLI and somewhere in the compilation process something fails?

Anyway, it would be helpful to see the program which causes this problem.

Cheers,
Till

On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

This stacktrace looks really suspicious.
It includes classes from the submission client (CLIClient), optimizer (JobGraphGenerator), and runtime (KryoSerializer).

Is it possible that you try to start a new Flink job inside another job?
This would not work.

Best, Fabian

Reply | Threaded
Open this post in threaded view
|

Re: Regarding Concurrent Modification Exception

Biplob Biswas
In reply to this post by Fabian Hueske-2
Hi,

No, we don't start a flink job inside another job, although the job creation was done in a loop, but only when one job is finished the next job started after cleanup. And we didn't get this exception on my local flink installation, it appears when i run on the cluster.

Thanks & Regards
Biplob Biswas

On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

This stacktrace looks really suspicious.
It includes classes from the submission client (CLIClient), optimizer (JobGraphGenerator), and runtime (KryoSerializer).

Is it possible that you try to start a new Flink job inside another job?
This would not work.

Best, Fabian

Reply | Threaded
Open this post in threaded view
|

Re: Regarding Concurrent Modification Exception

Maximilian Michels
HI Biplob,

Could you please supply some sample code? Otherwise it is tough to
debug this problem.

Cheers,
Max

On Tue, Feb 16, 2016 at 2:46 PM, Biplob Biswas <[hidden email]> wrote:

> Hi,
>
> No, we don't start a flink job inside another job, although the job creation
> was done in a loop, but only when one job is finished the next job started
> after cleanup. And we didn't get this exception on my local flink
> installation, it appears when i run on the cluster.
>
> Thanks & Regards
> Biplob Biswas
>
> On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <[hidden email]> wrote:
>>
>> Hi,
>>
>> This stacktrace looks really suspicious.
>> It includes classes from the submission client (CLIClient), optimizer
>> (JobGraphGenerator), and runtime (KryoSerializer).
>>
>> Is it possible that you try to start a new Flink job inside another job?
>> This would not work.
>>
>> Best, Fabian
>
>