CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

joshlemer
Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
snapshots fail when using the Filesystem back-end. Synchronous snapshots
succeed, and RocksDB snapshots succeed (both async and sync), but async
Filesystem snapshots fail with this error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.set(ArrayList.java:448)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
        at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76)
        at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86)
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270)
        at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

This stack trace occurs when I am trying to access the value of a

`ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization,
Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.

I have found similar errors occurring in already-fixed tickets like this
one:
https://issues.apache.org/jira/browse/FLINK-7484 
which is part of this umbrella issue:
https://issues.apache.org/jira/browse/FLINK-7830

However these tickets are apparently resolved, maybe the bug has not been
completely fixed? Or maybe I am making a mistake in programming? When I get
the value of the state, I do mutate it, and I also mutate the mutable.BitSet
before persisting again. But as far as I know this is perfectly ok by flink
yes?

Thanks for any help or pointers!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

Stephan Ewen
Thanks for reporting this, also thanks for checking out that this works with RocksDB and also with synchronous checkpoints.

I would assume that this issue lies not in the serializer itself, but in accidental sharing in the FsStateBackend async snapshots.
Do you know if the issue still exists in Flink 1.4.2?

On Tue, Apr 17, 2018 at 6:14 PM, joshlemer <[hidden email]> wrote:
Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
snapshots fail when using the Filesystem back-end. Synchronous snapshots
succeed, and RocksDB snapshots succeed (both async and sync), but async
Filesystem snapshots fail with this error:

java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.set(ArrayList.java:448)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
        at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
        at
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
        at
org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218)
        at
net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76)
        at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86)
        at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270)
        at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

This stack trace occurs when I am trying to access the value of a

`ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization,
Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.

I have found similar errors occurring in already-fixed tickets like this
one:
https://issues.apache.org/jira/browse/FLINK-7484
which is part of this umbrella issue:
https://issues.apache.org/jira/browse/FLINK-7830

However these tickets are apparently resolved, maybe the bug has not been
completely fixed? Or maybe I am making a mistake in programming? When I get
the value of the state, I do mutate it, and I also mutate the mutable.BitSet
before persisting again. But as far as I know this is perfectly ok by flink
yes?

Thanks for any help or pointers!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: CaseClassSerializer and/or TraversableSerializer may still not be threadsafe?

Stefan Richter
In reply to this post by joshlemer
Hi,

I agree that this looks like a serializer is shared between two threads, one of them being the event processing loop. I am doubting that the problem is with the async fs backend, because there is code in place that will duplicate all serializers for the async snapshot thread and this is also checked by a test. Furthermore, the code of this backend was not changed in ways that could affect this behaviour since many month and there have not been similar reports. The fact that it works for async snapshots with RocksDB does not have any implcations for the fs backend, because the RocksDB backend does not use any serializers to write snapshots. Mutating the bitset objects should also not be a problem, this would not cause such an exception in a serializer.

I find it more likely that there is still some problem with the duplication of stateful serializers, especially since your serializer looks like a more complex, nested structure. I have quickly checked through TraversableSerializer, CaseClassSerializer, and KryoSerializer but could not spot any obvious errors in their deep copy code. Except there might be a (known and discussed) case that registering stateful default serializers for Kryo can cause problems. Are you registering default serializers for Kryo?

As a further step to debug this problem, could you run this job with debug logging on the flink 1.5 RC? This would show if the problem is still present and flink 1.5 would also generate better logs to debug this problem.

Best,
Stefan

> Am 17.04.2018 um 18:14 schrieb joshlemer <[hidden email]>:
>
> Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
> snapshots fail when using the Filesystem back-end. Synchronous snapshots
> succeed, and RocksDB snapshots succeed (both async and sync), but async
> Filesystem snapshots fail with this error:
>
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>        at java.util.ArrayList.set(ArrayList.java:448)
>        at
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>        at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875)
>        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710)
>        at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>        at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>        at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>        at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>        at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>        at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>        at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>        at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282)
>        at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306)
>        at
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>        at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102)
>        at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218)
>        at
> net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76)
>        at
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86)
>        at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270)
>        at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>        at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>        at java.lang.Thread.run(Thread.java:748)
>
> This stack trace occurs when I am trying to access the value of a
>
> `ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization,
> Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on.
>
> I have found similar errors occurring in already-fixed tickets like this
> one:
> https://issues.apache.org/jira/browse/FLINK-7484 
> which is part of this umbrella issue:
> https://issues.apache.org/jira/browse/FLINK-7830
>
> However these tickets are apparently resolved, maybe the bug has not been
> completely fixed? Or maybe I am making a mistake in programming? When I get
> the value of the state, I do mutate it, and I also mutate the mutable.BitSet
> before persisting again. But as far as I know this is perfectly ok by flink
> yes?
>
> Thanks for any help or pointers!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/