kryo exception due to race condition

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

kryo exception due to race condition

Stefano Bortoli
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Stephan Ewen
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Stefano Bortoli

2015-10-01 18:50 GMT+02:00 Stephan Ewen <[hidden email]>:
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)


Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Stefano Bortoli
I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition issue with kryo serialization.
I could complete the process using the byte[], but at this point I actually need the POJO. I truly believe it is related to the reuse of the Kryo instance, which is not thread safe. 

------------------------------------------------------------------------------------------------------
2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26    Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

2015-10-02 9:46 GMT+02:00 Stefano Bortoli <[hidden email]>:

2015-10-01 18:50 GMT+02:00 Stephan Ewen <[hidden email]>:
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)



Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Stefano Bortoli-2
Hi guys, I could manage to complete the process crossing byte arrays I deserialize within the group function. However, I think this workaround is feasible just with relatively simple processes. Any idea/plan about to fix the serialization problem?

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: +39 0461 1823913

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-10-02 12:05 GMT+02:00 Stefano Bortoli <[hidden email]>:
I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition issue with kryo serialization.
I could complete the process using the byte[], but at this point I actually need the POJO. I truly believe it is related to the reuse of the Kryo instance, which is not thread safe. 

------------------------------------------------------------------------------------------------------
2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26    Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

2015-10-02 9:46 GMT+02:00 Stefano Bortoli <[hidden email]>:

2015-10-01 18:50 GMT+02:00 Stephan Ewen <[hidden email]>:
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)




Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Till Rohrmann
Hi Stefano,

we'll definitely look into it once Flink Forward is over and we've finished the current release work. Thanks for reporting the issue.

Cheers,
Till

On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys, I could manage to complete the process crossing byte arrays I deserialize within the group function. However, I think this workaround is feasible just with relatively simple processes. Any idea/plan about to fix the serialization problem?

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823913" value="+3904611823913" target="_blank">+39 0461 1823913

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-10-02 12:05 GMT+02:00 Stefano Bortoli <[hidden email]>:
I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition issue with kryo serialization.
I could complete the process using the byte[], but at this point I actually need the POJO. I truly believe it is related to the reuse of the Kryo instance, which is not thread safe. 

------------------------------------------------------------------------------------------------------
2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26    Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

2015-10-02 9:46 GMT+02:00 Stefano Bortoli <[hidden email]>:

2015-10-01 18:50 GMT+02:00 Stephan Ewen <[hidden email]>:
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)





Reply | Threaded
Open this post in threaded view
|

Re: kryo exception due to race condition

Stefano Bortoli
Perhaps we can put hands on it during the FlinkForward. :-D I have updated the ticket description finding out that the issue is generated performing a join just after the cross. See you in Berlin!

saluti,
Stefano

2015-10-06 9:39 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Stefano,

we'll definitely look into it once Flink Forward is over and we've finished the current release work. Thanks for reporting the issue.

Cheers,
Till

On Tue, Oct 6, 2015 at 9:21 AM, Stefano Bortoli <[hidden email]> wrote:
Hi guys, I could manage to complete the process crossing byte arrays I deserialize within the group function. However, I think this workaround is feasible just with relatively simple processes. Any idea/plan about to fix the serialization problem?

saluti,
Stefano

Stefano Bortoli, PhD
ENS Technical Director
_______________________________________________
OKKAMSrl www.okkam.it

Email: [hidden email]

Phone nr: <a href="tel:%2B39%200461%201823913" value="+3904611823913" target="_blank">+39 0461 1823913

Headquarters: Trento (Italy), Via Trener 8
Registered office: Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally privileged and/or confidential information. Please do not read it if you are not the intended recipient(S). Any use, distribution, reproduction or disclosure by any other person is strictly prohibited. If you have received this e-mail in error, please notify the sender and destroy the original transmission and its attachments without reading or saving it in any manner.


2015-10-02 12:05 GMT+02:00 Stefano Bortoli <[hidden email]>:
I don't know whether it is the same issue, but after switching from my POJOs to BSONObject I have got a race condition issue with kryo serialization.
I could complete the process using the byte[], but at this point I actually need the POJO. I truly believe it is related to the reuse of the Kryo instance, which is not thread safe. 

------------------------------------------------------------------------------------------------------
2015-10-02 11:55:26 INFO  JobClient:161 - 10/02/2015 11:55:26    Cross(Cross at main(FlinkMongoHadoop2LinkPOI2CDA.java:138))(4/4) switched to FAILED
java.lang.IndexOutOfBoundsException: Index: 112, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)

2015-10-02 9:46 GMT+02:00 Stefano Bortoli <[hidden email]>:

2015-10-01 18:50 GMT+02:00 Stephan Ewen <[hidden email]>:
This looks to me like a bug where type registrations are not properly forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli <[hidden email]> wrote:
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I am using the 0.10-milestone-1.
Checking the serializer: org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it may cause the problem due to possible race condition. We had these types of issues solved with a KryoFactory implementing a pool. Perhaps it should just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-----------------------------------------
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 114
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
    at org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
    at org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
    at org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
    at java.lang.Thread.run(Thread.java:745)